1use std::collections::HashMap;
4use std::net::IpAddr;
5use std::path::PathBuf;
6use std::sync::Arc;
7use std::time::{Instant, SystemTime};
8
9use tokio::sync::{broadcast, Mutex, RwLock};
10use tokio_util::sync::CancellationToken;
11use tuitbot_core::automation::circuit_breaker::CircuitBreaker;
12use tuitbot_core::automation::Runtime;
13use tuitbot_core::automation::WatchtowerLoop;
14use tuitbot_core::config::{
15 effective_config, Config, ConnectorConfig, ContentSourcesConfig, DeploymentMode,
16};
17use tuitbot_core::content::ContentGenerator;
18use tuitbot_core::context::semantic_index::SemanticIndex;
19use tuitbot_core::llm::embedding::EmbeddingProvider;
20use tuitbot_core::llm::factory::create_provider;
21use tuitbot_core::storage::accounts::{self, DEFAULT_ACCOUNT_ID};
22use tuitbot_core::storage::DbPool;
23use tuitbot_core::x_api::auth::TokenManager;
24use tuitbot_core::x_api::ScraperHealth;
25
26use tuitbot_core::error::XApiError;
27use tuitbot_core::x_api::auth;
28
29use crate::ws::AccountWsEvent;
30
31pub struct PendingOAuth {
33 pub code_verifier: String,
35 pub created_at: Instant,
37 pub account_id: String,
39 pub client_id: String,
41}
42
43pub struct AppState {
45 pub db: DbPool,
47 pub config_path: PathBuf,
49 pub data_dir: PathBuf,
51 pub event_tx: broadcast::Sender<AccountWsEvent>,
53 pub api_token: String,
55 pub passphrase_hash: RwLock<Option<String>>,
57 pub passphrase_hash_mtime: RwLock<Option<SystemTime>>,
59 pub bind_host: String,
61 pub bind_port: u16,
63 pub login_attempts: Mutex<HashMap<IpAddr, (u32, Instant)>>,
65 pub runtimes: Mutex<HashMap<String, Runtime>>,
67 pub content_generators: Mutex<HashMap<String, Arc<ContentGenerator>>>,
69 pub circuit_breaker: Option<Arc<CircuitBreaker>>,
71 pub scraper_health: Option<ScraperHealth>,
73 pub watchtower_cancel: RwLock<Option<CancellationToken>>,
75 pub content_sources: RwLock<ContentSourcesConfig>,
77 pub connector_config: ConnectorConfig,
79 pub deployment_mode: DeploymentMode,
81 pub pending_oauth: Mutex<HashMap<String, PendingOAuth>>,
83 pub token_managers: Mutex<HashMap<String, Arc<TokenManager>>>,
85 pub x_client_id: String,
87 pub semantic_index: Option<Arc<RwLock<SemanticIndex>>>,
89 pub embedding_provider: Option<Arc<dyn EmbeddingProvider>>,
91}
92
93impl AppState {
94 pub async fn get_x_access_token(
99 &self,
100 token_path: &std::path::Path,
101 account_id: &str,
102 ) -> Result<String, XApiError> {
103 {
105 let managers = self.token_managers.lock().await;
106 if let Some(tm) = managers.get(account_id) {
107 return tm.get_access_token().await;
108 }
109 }
110
111 let tokens = auth::load_tokens(token_path)?.ok_or(XApiError::AuthExpired)?;
113
114 let tm = Arc::new(TokenManager::new(
115 tokens,
116 self.x_client_id.clone(),
117 token_path.to_path_buf(),
118 ));
119
120 let access_token = tm.get_access_token().await?;
121
122 self.token_managers
123 .lock()
124 .await
125 .insert(account_id.to_string(), tm);
126
127 Ok(access_token)
128 }
129
130 pub async fn load_effective_config(&self, account_id: &str) -> Result<Config, String> {
135 let contents = std::fs::read_to_string(&self.config_path).unwrap_or_default();
136 let base: Config = toml::from_str(&contents).unwrap_or_default();
137
138 if account_id == DEFAULT_ACCOUNT_ID {
139 return Ok(base);
140 }
141
142 let account = accounts::get_account(&self.db, account_id)
143 .await
144 .map_err(|e| e.to_string())?
145 .ok_or_else(|| format!("account not found: {account_id}"))?;
146
147 effective_config(&base, &account.config_overrides)
148 .map(|r| r.config)
149 .map_err(|e| e.to_string())
150 }
151
152 pub async fn get_or_create_content_generator(
156 &self,
157 account_id: &str,
158 ) -> Result<Arc<ContentGenerator>, String> {
159 {
161 let generators = self.content_generators.lock().await;
162 if let Some(gen) = generators.get(account_id) {
163 return Ok(gen.clone());
164 }
165 }
166
167 let config = self.load_effective_config(account_id).await?;
168
169 let provider =
170 create_provider(&config.llm).map_err(|e| format!("LLM not configured: {e}"))?;
171
172 let gen = Arc::new(ContentGenerator::new(provider, config.business));
173
174 self.content_generators
175 .lock()
176 .await
177 .insert(account_id.to_string(), gen.clone());
178
179 Ok(gen)
180 }
181
182 pub fn is_local_first(&self) -> bool {
184 self.deployment_mode.is_local_first()
185 }
186
187 pub async fn restart_watchtower(&self) {
193 if let Some(cancel) = self.watchtower_cancel.write().await.take() {
195 cancel.cancel();
196 tracing::info!("Watchtower cancelled for config reload");
197 }
198
199 let loaded_config = Config::load(Some(&self.config_path.to_string_lossy())).ok();
201 let new_sources = loaded_config
202 .as_ref()
203 .map(|c| c.content_sources.clone())
204 .unwrap_or_default();
205 let connector_config = loaded_config
206 .as_ref()
207 .map(|c| c.connectors.clone())
208 .unwrap_or_default();
209 let deployment_mode = loaded_config
210 .as_ref()
211 .map(|c| c.deployment_mode.clone())
212 .unwrap_or_default();
213
214 let has_enabled: Vec<_> = new_sources
216 .sources
217 .iter()
218 .filter(|s| {
219 s.is_enabled()
220 && deployment_mode.allows_source_type(&s.source_type)
221 && (s.path.is_some() || s.folder_id.is_some())
222 })
223 .collect();
224
225 if has_enabled.is_empty() {
226 tracing::info!("Watchtower restart: no enabled sources, not spawning");
227 *self.content_sources.write().await = new_sources;
228 return;
229 }
230
231 if !deployment_mode.is_local_first()
234 && has_enabled.iter().any(|s| s.source_type == "local_fs")
235 {
236 tracing::info!(
237 mode = %deployment_mode,
238 "local_fs source in {} mode — data is user-controlled but not local-first",
239 deployment_mode
240 );
241 }
242
243 let cancel = CancellationToken::new();
245 let watchtower = WatchtowerLoop::new(
246 self.db.clone(),
247 new_sources.clone(),
248 connector_config,
249 self.data_dir.clone(),
250 );
251 let cancel_clone = cancel.clone();
252 tokio::spawn(async move {
253 watchtower.run(cancel_clone).await;
254 });
255
256 tracing::info!(
257 sources = has_enabled.len(),
258 "Watchtower restarted with updated config"
259 );
260
261 *self.watchtower_cancel.write().await = Some(cancel);
263 *self.content_sources.write().await = new_sources;
264 }
265}