Skip to main content

tuitbot_server/
state.rs

1//! Shared application state for the tuitbot server.
2
3use std::collections::HashMap;
4use std::net::IpAddr;
5use std::path::PathBuf;
6use std::sync::Arc;
7use std::time::{Instant, SystemTime};
8
9use tokio::sync::{broadcast, Mutex, RwLock};
10use tokio_util::sync::CancellationToken;
11use tuitbot_core::automation::circuit_breaker::CircuitBreaker;
12use tuitbot_core::automation::Runtime;
13use tuitbot_core::automation::WatchtowerLoop;
14use tuitbot_core::config::{
15    effective_config, Config, ConnectorConfig, ContentSourcesConfig, DeploymentMode,
16};
17use tuitbot_core::content::ContentGenerator;
18use tuitbot_core::llm::factory::create_provider;
19use tuitbot_core::storage::accounts::{self, DEFAULT_ACCOUNT_ID};
20use tuitbot_core::storage::DbPool;
21use tuitbot_core::x_api::auth::TokenManager;
22
23use tuitbot_core::error::XApiError;
24use tuitbot_core::x_api::auth;
25
26use crate::ws::AccountWsEvent;
27
28/// Pending OAuth PKCE state for connector link flows.
29pub struct PendingOAuth {
30    /// The PKCE code verifier needed to complete the token exchange.
31    pub code_verifier: String,
32    /// When this entry was created (for 10-minute expiry).
33    pub created_at: Instant,
34    /// The account ID that initiated this OAuth flow (empty for connectors).
35    pub account_id: String,
36}
37
38/// Shared application state accessible by all route handlers.
39pub struct AppState {
40    /// SQLite connection pool.
41    pub db: DbPool,
42    /// Path to the configuration file.
43    pub config_path: PathBuf,
44    /// Data directory for media storage (parent of config file).
45    pub data_dir: PathBuf,
46    /// Broadcast channel sender for real-time WebSocket events.
47    pub event_tx: broadcast::Sender<AccountWsEvent>,
48    /// Local bearer token for API authentication.
49    pub api_token: String,
50    /// Bcrypt hash of the web login passphrase (None if not configured).
51    pub passphrase_hash: RwLock<Option<String>>,
52    /// Last-observed mtime of the `passphrase_hash` file (for detecting out-of-band resets).
53    pub passphrase_hash_mtime: RwLock<Option<SystemTime>>,
54    /// Host address the server is bound to.
55    pub bind_host: String,
56    /// Port the server is listening on.
57    pub bind_port: u16,
58    /// Per-IP login attempt tracking for rate limiting: (count, window_start).
59    pub login_attempts: Mutex<HashMap<IpAddr, (u32, Instant)>>,
60    /// Per-account automation runtimes (keyed by account_id).
61    pub runtimes: Mutex<HashMap<String, Runtime>>,
62    /// Per-account content generators for AI assist endpoints.
63    pub content_generators: Mutex<HashMap<String, Arc<ContentGenerator>>>,
64    /// Optional circuit breaker for X API rate-limit protection.
65    pub circuit_breaker: Option<Arc<CircuitBreaker>>,
66    /// Cancellation token for the Watchtower filesystem watcher (None if not running).
67    pub watchtower_cancel: RwLock<Option<CancellationToken>>,
68    /// Content sources configuration for the Watchtower.
69    pub content_sources: RwLock<ContentSourcesConfig>,
70    /// Connector configuration for remote source OAuth flows.
71    pub connector_config: ConnectorConfig,
72    /// Deployment mode (desktop, self_host, or cloud).
73    pub deployment_mode: DeploymentMode,
74    /// Pending OAuth PKCE challenges keyed by state parameter.
75    pub pending_oauth: Mutex<HashMap<String, PendingOAuth>>,
76    /// Per-account X API token managers for automatic token refresh.
77    pub token_managers: Mutex<HashMap<String, Arc<TokenManager>>>,
78    /// X API client ID from config (needed to create token managers).
79    pub x_client_id: String,
80}
81
82impl AppState {
83    /// Get a fresh X API access token for the given account.
84    ///
85    /// Lazily creates a `TokenManager` on first use (loading tokens from disk),
86    /// then returns a token that is automatically refreshed before expiry.
87    pub async fn get_x_access_token(
88        &self,
89        token_path: &std::path::Path,
90        account_id: &str,
91    ) -> Result<String, XApiError> {
92        // Fast path: token manager already exists.
93        {
94            let managers = self.token_managers.lock().await;
95            if let Some(tm) = managers.get(account_id) {
96                return tm.get_access_token().await;
97            }
98        }
99
100        // Load tokens from disk and create a new manager.
101        let tokens = auth::load_tokens(token_path)?.ok_or(XApiError::AuthExpired)?;
102
103        let tm = Arc::new(TokenManager::new(
104            tokens,
105            self.x_client_id.clone(),
106            token_path.to_path_buf(),
107        ));
108
109        let access_token = tm.get_access_token().await?;
110
111        self.token_managers
112            .lock()
113            .await
114            .insert(account_id.to_string(), tm);
115
116        Ok(access_token)
117    }
118
119    /// Load the effective config for a given account.
120    ///
121    /// Default account: reads config.toml directly (backward compat).
122    /// Non-default: merges config.toml base with account's `config_overrides` from DB.
123    pub async fn load_effective_config(&self, account_id: &str) -> Result<Config, String> {
124        let contents = std::fs::read_to_string(&self.config_path).unwrap_or_default();
125        let base: Config = toml::from_str(&contents).unwrap_or_default();
126
127        if account_id == DEFAULT_ACCOUNT_ID {
128            return Ok(base);
129        }
130
131        let account = accounts::get_account(&self.db, account_id)
132            .await
133            .map_err(|e| e.to_string())?
134            .ok_or_else(|| format!("account not found: {account_id}"))?;
135
136        effective_config(&base, &account.config_overrides)
137            .map(|r| r.config)
138            .map_err(|e| e.to_string())
139    }
140
141    /// Lazily create or return a cached `ContentGenerator` for the given account.
142    ///
143    /// Loads effective config, creates the LLM provider, and caches the generator.
144    pub async fn get_or_create_content_generator(
145        &self,
146        account_id: &str,
147    ) -> Result<Arc<ContentGenerator>, String> {
148        // Fast path: already cached.
149        {
150            let generators = self.content_generators.lock().await;
151            if let Some(gen) = generators.get(account_id) {
152                return Ok(gen.clone());
153            }
154        }
155
156        let config = self.load_effective_config(account_id).await?;
157
158        let provider =
159            create_provider(&config.llm).map_err(|e| format!("LLM not configured: {e}"))?;
160
161        let gen = Arc::new(ContentGenerator::new(provider, config.business));
162
163        self.content_generators
164            .lock()
165            .await
166            .insert(account_id.to_string(), gen.clone());
167
168        Ok(gen)
169    }
170
171    /// Cancel the running Watchtower (if any), reload config from disk,
172    /// and spawn a new Watchtower loop with the updated sources.
173    ///
174    /// Called after `PATCH /api/settings` modifies `content_sources` or
175    /// `deployment_mode`.
176    pub async fn restart_watchtower(&self) {
177        // 1. Cancel existing watchtower.
178        if let Some(cancel) = self.watchtower_cancel.write().await.take() {
179            cancel.cancel();
180            tracing::info!("Watchtower cancelled for config reload");
181        }
182
183        // 2. Reload config from disk.
184        let loaded_config = Config::load(Some(&self.config_path.to_string_lossy())).ok();
185        let new_sources = loaded_config
186            .as_ref()
187            .map(|c| c.content_sources.clone())
188            .unwrap_or_default();
189        let connector_config = loaded_config
190            .as_ref()
191            .map(|c| c.connectors.clone())
192            .unwrap_or_default();
193        let deployment_mode = loaded_config
194            .as_ref()
195            .map(|c| c.deployment_mode.clone())
196            .unwrap_or_default();
197
198        // 3. Check if any sources are enabled and eligible.
199        let has_enabled: Vec<_> = new_sources
200            .sources
201            .iter()
202            .filter(|s| {
203                s.is_enabled()
204                    && deployment_mode.allows_source_type(&s.source_type)
205                    && (s.path.is_some() || s.folder_id.is_some())
206            })
207            .collect();
208
209        if has_enabled.is_empty() {
210            tracing::info!("Watchtower restart: no enabled sources, not spawning");
211            *self.content_sources.write().await = new_sources;
212            return;
213        }
214
215        // 4. Spawn new WatchtowerLoop.
216        let cancel = CancellationToken::new();
217        let watchtower = WatchtowerLoop::new(
218            self.db.clone(),
219            new_sources.clone(),
220            connector_config,
221            self.data_dir.clone(),
222        );
223        let cancel_clone = cancel.clone();
224        tokio::spawn(async move {
225            watchtower.run(cancel_clone).await;
226        });
227
228        tracing::info!(
229            sources = has_enabled.len(),
230            "Watchtower restarted with updated config"
231        );
232
233        // 5. Update state.
234        *self.watchtower_cancel.write().await = Some(cancel);
235        *self.content_sources.write().await = new_sources;
236    }
237}