1use std::collections::HashMap;
4use std::net::IpAddr;
5use std::path::PathBuf;
6use std::sync::Arc;
7use std::time::{Instant, SystemTime};
8
9use tokio::sync::{broadcast, Mutex, RwLock};
10use tokio_util::sync::CancellationToken;
11use tuitbot_core::automation::circuit_breaker::CircuitBreaker;
12use tuitbot_core::automation::Runtime;
13use tuitbot_core::automation::WatchtowerLoop;
14use tuitbot_core::config::{
15 effective_config, Config, ConnectorConfig, ContentSourcesConfig, DeploymentMode,
16};
17use tuitbot_core::content::ContentGenerator;
18use tuitbot_core::llm::factory::create_provider;
19use tuitbot_core::storage::accounts::{self, DEFAULT_ACCOUNT_ID};
20use tuitbot_core::storage::DbPool;
21use tuitbot_core::x_api::auth::TokenManager;
22
23use tuitbot_core::error::XApiError;
24use tuitbot_core::x_api::auth;
25
26use crate::ws::AccountWsEvent;
27
28pub struct PendingOAuth {
30 pub code_verifier: String,
32 pub created_at: Instant,
34 pub account_id: String,
36}
37
38pub struct AppState {
40 pub db: DbPool,
42 pub config_path: PathBuf,
44 pub data_dir: PathBuf,
46 pub event_tx: broadcast::Sender<AccountWsEvent>,
48 pub api_token: String,
50 pub passphrase_hash: RwLock<Option<String>>,
52 pub passphrase_hash_mtime: RwLock<Option<SystemTime>>,
54 pub bind_host: String,
56 pub bind_port: u16,
58 pub login_attempts: Mutex<HashMap<IpAddr, (u32, Instant)>>,
60 pub runtimes: Mutex<HashMap<String, Runtime>>,
62 pub content_generators: Mutex<HashMap<String, Arc<ContentGenerator>>>,
64 pub circuit_breaker: Option<Arc<CircuitBreaker>>,
66 pub watchtower_cancel: RwLock<Option<CancellationToken>>,
68 pub content_sources: RwLock<ContentSourcesConfig>,
70 pub connector_config: ConnectorConfig,
72 pub deployment_mode: DeploymentMode,
74 pub pending_oauth: Mutex<HashMap<String, PendingOAuth>>,
76 pub token_managers: Mutex<HashMap<String, Arc<TokenManager>>>,
78 pub x_client_id: String,
80}
81
82impl AppState {
83 pub async fn get_x_access_token(
88 &self,
89 token_path: &std::path::Path,
90 account_id: &str,
91 ) -> Result<String, XApiError> {
92 {
94 let managers = self.token_managers.lock().await;
95 if let Some(tm) = managers.get(account_id) {
96 return tm.get_access_token().await;
97 }
98 }
99
100 let tokens = auth::load_tokens(token_path)?.ok_or(XApiError::AuthExpired)?;
102
103 let tm = Arc::new(TokenManager::new(
104 tokens,
105 self.x_client_id.clone(),
106 token_path.to_path_buf(),
107 ));
108
109 let access_token = tm.get_access_token().await?;
110
111 self.token_managers
112 .lock()
113 .await
114 .insert(account_id.to_string(), tm);
115
116 Ok(access_token)
117 }
118
119 pub async fn load_effective_config(&self, account_id: &str) -> Result<Config, String> {
124 let contents = std::fs::read_to_string(&self.config_path).unwrap_or_default();
125 let base: Config = toml::from_str(&contents).unwrap_or_default();
126
127 if account_id == DEFAULT_ACCOUNT_ID {
128 return Ok(base);
129 }
130
131 let account = accounts::get_account(&self.db, account_id)
132 .await
133 .map_err(|e| e.to_string())?
134 .ok_or_else(|| format!("account not found: {account_id}"))?;
135
136 effective_config(&base, &account.config_overrides)
137 .map(|r| r.config)
138 .map_err(|e| e.to_string())
139 }
140
141 pub async fn get_or_create_content_generator(
145 &self,
146 account_id: &str,
147 ) -> Result<Arc<ContentGenerator>, String> {
148 {
150 let generators = self.content_generators.lock().await;
151 if let Some(gen) = generators.get(account_id) {
152 return Ok(gen.clone());
153 }
154 }
155
156 let config = self.load_effective_config(account_id).await?;
157
158 let provider =
159 create_provider(&config.llm).map_err(|e| format!("LLM not configured: {e}"))?;
160
161 let gen = Arc::new(ContentGenerator::new(provider, config.business));
162
163 self.content_generators
164 .lock()
165 .await
166 .insert(account_id.to_string(), gen.clone());
167
168 Ok(gen)
169 }
170
171 pub async fn restart_watchtower(&self) {
177 if let Some(cancel) = self.watchtower_cancel.write().await.take() {
179 cancel.cancel();
180 tracing::info!("Watchtower cancelled for config reload");
181 }
182
183 let loaded_config = Config::load(Some(&self.config_path.to_string_lossy())).ok();
185 let new_sources = loaded_config
186 .as_ref()
187 .map(|c| c.content_sources.clone())
188 .unwrap_or_default();
189 let connector_config = loaded_config
190 .as_ref()
191 .map(|c| c.connectors.clone())
192 .unwrap_or_default();
193 let deployment_mode = loaded_config
194 .as_ref()
195 .map(|c| c.deployment_mode.clone())
196 .unwrap_or_default();
197
198 let has_enabled: Vec<_> = new_sources
200 .sources
201 .iter()
202 .filter(|s| {
203 s.is_enabled()
204 && deployment_mode.allows_source_type(&s.source_type)
205 && (s.path.is_some() || s.folder_id.is_some())
206 })
207 .collect();
208
209 if has_enabled.is_empty() {
210 tracing::info!("Watchtower restart: no enabled sources, not spawning");
211 *self.content_sources.write().await = new_sources;
212 return;
213 }
214
215 let cancel = CancellationToken::new();
217 let watchtower = WatchtowerLoop::new(
218 self.db.clone(),
219 new_sources.clone(),
220 connector_config,
221 self.data_dir.clone(),
222 );
223 let cancel_clone = cancel.clone();
224 tokio::spawn(async move {
225 watchtower.run(cancel_clone).await;
226 });
227
228 tracing::info!(
229 sources = has_enabled.len(),
230 "Watchtower restarted with updated config"
231 );
232
233 *self.watchtower_cancel.write().await = Some(cancel);
235 *self.content_sources.write().await = new_sources;
236 }
237}