Skip to main content

tuitbot_server/
state.rs

1//! Shared application state for the tuitbot server.
2
3use std::collections::HashMap;
4use std::net::IpAddr;
5use std::path::PathBuf;
6use std::sync::Arc;
7use std::time::{Instant, SystemTime};
8
9use tokio::sync::{broadcast, Mutex, RwLock};
10use tokio_util::sync::CancellationToken;
11use tuitbot_core::automation::circuit_breaker::CircuitBreaker;
12use tuitbot_core::automation::Runtime;
13use tuitbot_core::automation::WatchtowerLoop;
14use tuitbot_core::config::{
15    effective_config, Config, ConnectorConfig, ContentSourcesConfig, DeploymentMode,
16};
17use tuitbot_core::content::ContentGenerator;
18use tuitbot_core::llm::factory::create_provider;
19use tuitbot_core::storage::accounts::{self, DEFAULT_ACCOUNT_ID};
20use tuitbot_core::storage::DbPool;
21use tuitbot_core::x_api::auth::TokenManager;
22use tuitbot_core::x_api::ScraperHealth;
23
24use tuitbot_core::error::XApiError;
25use tuitbot_core::x_api::auth;
26
27use crate::ws::AccountWsEvent;
28
29/// Pending OAuth PKCE state for connector link flows.
30pub struct PendingOAuth {
31    /// The PKCE code verifier needed to complete the token exchange.
32    pub code_verifier: String,
33    /// When this entry was created (for 10-minute expiry).
34    pub created_at: Instant,
35    /// The account ID that initiated this OAuth flow (empty for connectors).
36    pub account_id: String,
37    /// The X API client ID used for this flow (for callback token exchange).
38    pub client_id: String,
39}
40
41/// Shared application state accessible by all route handlers.
42pub struct AppState {
43    /// SQLite connection pool.
44    pub db: DbPool,
45    /// Path to the configuration file.
46    pub config_path: PathBuf,
47    /// Data directory for media storage (parent of config file).
48    pub data_dir: PathBuf,
49    /// Broadcast channel sender for real-time WebSocket events.
50    pub event_tx: broadcast::Sender<AccountWsEvent>,
51    /// Local bearer token for API authentication.
52    pub api_token: String,
53    /// Bcrypt hash of the web login passphrase (None if not configured).
54    pub passphrase_hash: RwLock<Option<String>>,
55    /// Last-observed mtime of the `passphrase_hash` file (for detecting out-of-band resets).
56    pub passphrase_hash_mtime: RwLock<Option<SystemTime>>,
57    /// Host address the server is bound to.
58    pub bind_host: String,
59    /// Port the server is listening on.
60    pub bind_port: u16,
61    /// Per-IP login attempt tracking for rate limiting: (count, window_start).
62    pub login_attempts: Mutex<HashMap<IpAddr, (u32, Instant)>>,
63    /// Per-account automation runtimes (keyed by account_id).
64    pub runtimes: Mutex<HashMap<String, Runtime>>,
65    /// Per-account content generators for AI assist endpoints.
66    pub content_generators: Mutex<HashMap<String, Arc<ContentGenerator>>>,
67    /// Optional circuit breaker for X API rate-limit protection.
68    pub circuit_breaker: Option<Arc<CircuitBreaker>>,
69    /// Optional scraper health tracker (populated when provider_backend = "scraper").
70    pub scraper_health: Option<ScraperHealth>,
71    /// Cancellation token for the Watchtower filesystem watcher (None if not running).
72    pub watchtower_cancel: RwLock<Option<CancellationToken>>,
73    /// Content sources configuration for the Watchtower.
74    pub content_sources: RwLock<ContentSourcesConfig>,
75    /// Connector configuration for remote source OAuth flows.
76    pub connector_config: ConnectorConfig,
77    /// Deployment mode (desktop, self_host, or cloud).
78    pub deployment_mode: DeploymentMode,
79    /// Pending OAuth PKCE challenges keyed by state parameter.
80    pub pending_oauth: Mutex<HashMap<String, PendingOAuth>>,
81    /// Per-account X API token managers for automatic token refresh.
82    pub token_managers: Mutex<HashMap<String, Arc<TokenManager>>>,
83    /// X API client ID from config (needed to create token managers).
84    pub x_client_id: String,
85}
86
87impl AppState {
88    /// Get a fresh X API access token for the given account.
89    ///
90    /// Lazily creates a `TokenManager` on first use (loading tokens from disk),
91    /// then returns a token that is automatically refreshed before expiry.
92    pub async fn get_x_access_token(
93        &self,
94        token_path: &std::path::Path,
95        account_id: &str,
96    ) -> Result<String, XApiError> {
97        // Fast path: token manager already exists.
98        {
99            let managers = self.token_managers.lock().await;
100            if let Some(tm) = managers.get(account_id) {
101                return tm.get_access_token().await;
102            }
103        }
104
105        // Load tokens from disk and create a new manager.
106        let tokens = auth::load_tokens(token_path)?.ok_or(XApiError::AuthExpired)?;
107
108        let tm = Arc::new(TokenManager::new(
109            tokens,
110            self.x_client_id.clone(),
111            token_path.to_path_buf(),
112        ));
113
114        let access_token = tm.get_access_token().await?;
115
116        self.token_managers
117            .lock()
118            .await
119            .insert(account_id.to_string(), tm);
120
121        Ok(access_token)
122    }
123
124    /// Load the effective config for a given account.
125    ///
126    /// Default account: reads config.toml directly (backward compat).
127    /// Non-default: merges config.toml base with account's `config_overrides` from DB.
128    pub async fn load_effective_config(&self, account_id: &str) -> Result<Config, String> {
129        let contents = std::fs::read_to_string(&self.config_path).unwrap_or_default();
130        let base: Config = toml::from_str(&contents).unwrap_or_default();
131
132        if account_id == DEFAULT_ACCOUNT_ID {
133            return Ok(base);
134        }
135
136        let account = accounts::get_account(&self.db, account_id)
137            .await
138            .map_err(|e| e.to_string())?
139            .ok_or_else(|| format!("account not found: {account_id}"))?;
140
141        effective_config(&base, &account.config_overrides)
142            .map(|r| r.config)
143            .map_err(|e| e.to_string())
144    }
145
146    /// Lazily create or return a cached `ContentGenerator` for the given account.
147    ///
148    /// Loads effective config, creates the LLM provider, and caches the generator.
149    pub async fn get_or_create_content_generator(
150        &self,
151        account_id: &str,
152    ) -> Result<Arc<ContentGenerator>, String> {
153        // Fast path: already cached.
154        {
155            let generators = self.content_generators.lock().await;
156            if let Some(gen) = generators.get(account_id) {
157                return Ok(gen.clone());
158            }
159        }
160
161        let config = self.load_effective_config(account_id).await?;
162
163        let provider =
164            create_provider(&config.llm).map_err(|e| format!("LLM not configured: {e}"))?;
165
166        let gen = Arc::new(ContentGenerator::new(provider, config.business));
167
168        self.content_generators
169            .lock()
170            .await
171            .insert(account_id.to_string(), gen.clone());
172
173        Ok(gen)
174    }
175
176    /// Returns `true` if the current deployment mode is local-first (Desktop).
177    pub fn is_local_first(&self) -> bool {
178        self.deployment_mode.is_local_first()
179    }
180
181    /// Cancel the running Watchtower (if any), reload config from disk,
182    /// and spawn a new Watchtower loop with the updated sources.
183    ///
184    /// Called after `PATCH /api/settings` modifies `content_sources` or
185    /// `deployment_mode`.
186    pub async fn restart_watchtower(&self) {
187        // 1. Cancel existing watchtower.
188        if let Some(cancel) = self.watchtower_cancel.write().await.take() {
189            cancel.cancel();
190            tracing::info!("Watchtower cancelled for config reload");
191        }
192
193        // 2. Reload config from disk.
194        let loaded_config = Config::load(Some(&self.config_path.to_string_lossy())).ok();
195        let new_sources = loaded_config
196            .as_ref()
197            .map(|c| c.content_sources.clone())
198            .unwrap_or_default();
199        let connector_config = loaded_config
200            .as_ref()
201            .map(|c| c.connectors.clone())
202            .unwrap_or_default();
203        let deployment_mode = loaded_config
204            .as_ref()
205            .map(|c| c.deployment_mode.clone())
206            .unwrap_or_default();
207
208        // 3. Check if any sources are enabled and eligible.
209        let has_enabled: Vec<_> = new_sources
210            .sources
211            .iter()
212            .filter(|s| {
213                s.is_enabled()
214                    && deployment_mode.allows_source_type(&s.source_type)
215                    && (s.path.is_some() || s.folder_id.is_some())
216            })
217            .collect();
218
219        if has_enabled.is_empty() {
220            tracing::info!("Watchtower restart: no enabled sources, not spawning");
221            *self.content_sources.write().await = new_sources;
222            return;
223        }
224
225        // Surface privacy envelope for operators: local_fs in non-Desktop mode
226        // means data is user-controlled but not same-machine local-first.
227        if !deployment_mode.is_local_first()
228            && has_enabled.iter().any(|s| s.source_type == "local_fs")
229        {
230            tracing::info!(
231                mode = %deployment_mode,
232                "local_fs source in {} mode — data is user-controlled but not local-first",
233                deployment_mode
234            );
235        }
236
237        // 4. Spawn new WatchtowerLoop.
238        let cancel = CancellationToken::new();
239        let watchtower = WatchtowerLoop::new(
240            self.db.clone(),
241            new_sources.clone(),
242            connector_config,
243            self.data_dir.clone(),
244        );
245        let cancel_clone = cancel.clone();
246        tokio::spawn(async move {
247            watchtower.run(cancel_clone).await;
248        });
249
250        tracing::info!(
251            sources = has_enabled.len(),
252            "Watchtower restarted with updated config"
253        );
254
255        // 5. Update state.
256        *self.watchtower_cancel.write().await = Some(cancel);
257        *self.content_sources.write().await = new_sources;
258    }
259}