1use std::collections::HashMap;
4use std::net::IpAddr;
5use std::path::PathBuf;
6use std::sync::Arc;
7use std::time::{Instant, SystemTime};
8
9use tokio::sync::{broadcast, Mutex, RwLock};
10use tokio_util::sync::CancellationToken;
11use tuitbot_core::automation::circuit_breaker::CircuitBreaker;
12use tuitbot_core::automation::Runtime;
13use tuitbot_core::automation::WatchtowerLoop;
14use tuitbot_core::config::{
15 effective_config, Config, ConnectorConfig, ContentSourcesConfig, DeploymentMode,
16};
17use tuitbot_core::content::ContentGenerator;
18use tuitbot_core::llm::factory::create_provider;
19use tuitbot_core::storage::accounts::{self, DEFAULT_ACCOUNT_ID};
20use tuitbot_core::storage::DbPool;
21use tuitbot_core::x_api::auth::TokenManager;
22use tuitbot_core::x_api::ScraperHealth;
23
24use tuitbot_core::error::XApiError;
25use tuitbot_core::x_api::auth;
26
27use crate::ws::AccountWsEvent;
28
29pub struct PendingOAuth {
31 pub code_verifier: String,
33 pub created_at: Instant,
35 pub account_id: String,
37 pub client_id: String,
39}
40
41pub struct AppState {
43 pub db: DbPool,
45 pub config_path: PathBuf,
47 pub data_dir: PathBuf,
49 pub event_tx: broadcast::Sender<AccountWsEvent>,
51 pub api_token: String,
53 pub passphrase_hash: RwLock<Option<String>>,
55 pub passphrase_hash_mtime: RwLock<Option<SystemTime>>,
57 pub bind_host: String,
59 pub bind_port: u16,
61 pub login_attempts: Mutex<HashMap<IpAddr, (u32, Instant)>>,
63 pub runtimes: Mutex<HashMap<String, Runtime>>,
65 pub content_generators: Mutex<HashMap<String, Arc<ContentGenerator>>>,
67 pub circuit_breaker: Option<Arc<CircuitBreaker>>,
69 pub scraper_health: Option<ScraperHealth>,
71 pub watchtower_cancel: RwLock<Option<CancellationToken>>,
73 pub content_sources: RwLock<ContentSourcesConfig>,
75 pub connector_config: ConnectorConfig,
77 pub deployment_mode: DeploymentMode,
79 pub pending_oauth: Mutex<HashMap<String, PendingOAuth>>,
81 pub token_managers: Mutex<HashMap<String, Arc<TokenManager>>>,
83 pub x_client_id: String,
85}
86
87impl AppState {
88 pub async fn get_x_access_token(
93 &self,
94 token_path: &std::path::Path,
95 account_id: &str,
96 ) -> Result<String, XApiError> {
97 {
99 let managers = self.token_managers.lock().await;
100 if let Some(tm) = managers.get(account_id) {
101 return tm.get_access_token().await;
102 }
103 }
104
105 let tokens = auth::load_tokens(token_path)?.ok_or(XApiError::AuthExpired)?;
107
108 let tm = Arc::new(TokenManager::new(
109 tokens,
110 self.x_client_id.clone(),
111 token_path.to_path_buf(),
112 ));
113
114 let access_token = tm.get_access_token().await?;
115
116 self.token_managers
117 .lock()
118 .await
119 .insert(account_id.to_string(), tm);
120
121 Ok(access_token)
122 }
123
124 pub async fn load_effective_config(&self, account_id: &str) -> Result<Config, String> {
129 let contents = std::fs::read_to_string(&self.config_path).unwrap_or_default();
130 let base: Config = toml::from_str(&contents).unwrap_or_default();
131
132 if account_id == DEFAULT_ACCOUNT_ID {
133 return Ok(base);
134 }
135
136 let account = accounts::get_account(&self.db, account_id)
137 .await
138 .map_err(|e| e.to_string())?
139 .ok_or_else(|| format!("account not found: {account_id}"))?;
140
141 effective_config(&base, &account.config_overrides)
142 .map(|r| r.config)
143 .map_err(|e| e.to_string())
144 }
145
146 pub async fn get_or_create_content_generator(
150 &self,
151 account_id: &str,
152 ) -> Result<Arc<ContentGenerator>, String> {
153 {
155 let generators = self.content_generators.lock().await;
156 if let Some(gen) = generators.get(account_id) {
157 return Ok(gen.clone());
158 }
159 }
160
161 let config = self.load_effective_config(account_id).await?;
162
163 let provider =
164 create_provider(&config.llm).map_err(|e| format!("LLM not configured: {e}"))?;
165
166 let gen = Arc::new(ContentGenerator::new(provider, config.business));
167
168 self.content_generators
169 .lock()
170 .await
171 .insert(account_id.to_string(), gen.clone());
172
173 Ok(gen)
174 }
175
176 pub async fn restart_watchtower(&self) {
182 if let Some(cancel) = self.watchtower_cancel.write().await.take() {
184 cancel.cancel();
185 tracing::info!("Watchtower cancelled for config reload");
186 }
187
188 let loaded_config = Config::load(Some(&self.config_path.to_string_lossy())).ok();
190 let new_sources = loaded_config
191 .as_ref()
192 .map(|c| c.content_sources.clone())
193 .unwrap_or_default();
194 let connector_config = loaded_config
195 .as_ref()
196 .map(|c| c.connectors.clone())
197 .unwrap_or_default();
198 let deployment_mode = loaded_config
199 .as_ref()
200 .map(|c| c.deployment_mode.clone())
201 .unwrap_or_default();
202
203 let has_enabled: Vec<_> = new_sources
205 .sources
206 .iter()
207 .filter(|s| {
208 s.is_enabled()
209 && deployment_mode.allows_source_type(&s.source_type)
210 && (s.path.is_some() || s.folder_id.is_some())
211 })
212 .collect();
213
214 if has_enabled.is_empty() {
215 tracing::info!("Watchtower restart: no enabled sources, not spawning");
216 *self.content_sources.write().await = new_sources;
217 return;
218 }
219
220 let cancel = CancellationToken::new();
222 let watchtower = WatchtowerLoop::new(
223 self.db.clone(),
224 new_sources.clone(),
225 connector_config,
226 self.data_dir.clone(),
227 );
228 let cancel_clone = cancel.clone();
229 tokio::spawn(async move {
230 watchtower.run(cancel_clone).await;
231 });
232
233 tracing::info!(
234 sources = has_enabled.len(),
235 "Watchtower restarted with updated config"
236 );
237
238 *self.watchtower_cancel.write().await = Some(cancel);
240 *self.content_sources.write().await = new_sources;
241 }
242}