Skip to main content

tuitbot_server/
state.rs

1//! Shared application state for the tuitbot server.
2
3use std::collections::HashMap;
4use std::net::IpAddr;
5use std::path::PathBuf;
6use std::sync::Arc;
7use std::time::{Instant, SystemTime};
8
9use tokio::sync::{broadcast, Mutex, RwLock};
10use tokio_util::sync::CancellationToken;
11use tuitbot_core::automation::circuit_breaker::CircuitBreaker;
12use tuitbot_core::automation::Runtime;
13use tuitbot_core::automation::WatchtowerLoop;
14use tuitbot_core::config::{
15    effective_config, Config, ConnectorConfig, ContentSourcesConfig, DeploymentMode,
16};
17use tuitbot_core::content::ContentGenerator;
18use tuitbot_core::context::semantic_index::SemanticIndex;
19use tuitbot_core::llm::embedding::EmbeddingProvider;
20use tuitbot_core::llm::factory::create_provider;
21use tuitbot_core::storage::accounts::{self, DEFAULT_ACCOUNT_ID};
22use tuitbot_core::storage::DbPool;
23use tuitbot_core::x_api::auth::TokenManager;
24use tuitbot_core::x_api::ScraperHealth;
25
26use tuitbot_core::error::XApiError;
27use tuitbot_core::x_api::auth;
28
29use crate::ws::AccountWsEvent;
30
31/// Pending OAuth PKCE state for connector link flows.
32pub struct PendingOAuth {
33    /// The PKCE code verifier needed to complete the token exchange.
34    pub code_verifier: String,
35    /// When this entry was created (for 10-minute expiry).
36    pub created_at: Instant,
37    /// The account ID that initiated this OAuth flow (empty for connectors).
38    pub account_id: String,
39    /// The X API client ID used for this flow (for callback token exchange).
40    pub client_id: String,
41}
42
43/// Shared application state accessible by all route handlers.
44pub struct AppState {
45    /// SQLite connection pool.
46    pub db: DbPool,
47    /// Path to the configuration file.
48    pub config_path: PathBuf,
49    /// Data directory for media storage (parent of config file).
50    pub data_dir: PathBuf,
51    /// Broadcast channel sender for real-time WebSocket events.
52    pub event_tx: broadcast::Sender<AccountWsEvent>,
53    /// Local bearer token for API authentication.
54    pub api_token: String,
55    /// Bcrypt hash of the web login passphrase (None if not configured).
56    pub passphrase_hash: RwLock<Option<String>>,
57    /// Last-observed mtime of the `passphrase_hash` file (for detecting out-of-band resets).
58    pub passphrase_hash_mtime: RwLock<Option<SystemTime>>,
59    /// Host address the server is bound to.
60    pub bind_host: String,
61    /// Port the server is listening on.
62    pub bind_port: u16,
63    /// Per-IP login attempt tracking for rate limiting: (count, window_start).
64    pub login_attempts: Mutex<HashMap<IpAddr, (u32, Instant)>>,
65    /// Per-account automation runtimes (keyed by account_id).
66    pub runtimes: Mutex<HashMap<String, Runtime>>,
67    /// Per-account content generators for AI assist endpoints.
68    pub content_generators: Mutex<HashMap<String, Arc<ContentGenerator>>>,
69    /// Optional circuit breaker for X API rate-limit protection.
70    pub circuit_breaker: Option<Arc<CircuitBreaker>>,
71    /// Optional scraper health tracker (populated when provider_backend = "scraper").
72    pub scraper_health: Option<ScraperHealth>,
73    /// Cancellation token for the Watchtower filesystem watcher (None if not running).
74    pub watchtower_cancel: RwLock<Option<CancellationToken>>,
75    /// Content sources configuration for the Watchtower.
76    pub content_sources: RwLock<ContentSourcesConfig>,
77    /// Connector configuration for remote source OAuth flows.
78    pub connector_config: ConnectorConfig,
79    /// Deployment mode (desktop, self_host, or cloud).
80    pub deployment_mode: DeploymentMode,
81    /// Pending OAuth PKCE challenges keyed by state parameter.
82    pub pending_oauth: Mutex<HashMap<String, PendingOAuth>>,
83    /// Per-account X API token managers for automatic token refresh.
84    pub token_managers: Mutex<HashMap<String, Arc<TokenManager>>>,
85    /// X API client ID from config (needed to create token managers).
86    pub x_client_id: String,
87    /// In-memory semantic search index (None if embedding not configured).
88    pub semantic_index: Option<Arc<RwLock<SemanticIndex>>>,
89    /// Embedding provider for semantic indexing (None if not configured).
90    pub embedding_provider: Option<Arc<dyn EmbeddingProvider>>,
91}
92
93impl AppState {
94    /// Get a fresh X API access token for the given account.
95    ///
96    /// Lazily creates a `TokenManager` on first use (loading tokens from disk),
97    /// then returns a token that is automatically refreshed before expiry.
98    pub async fn get_x_access_token(
99        &self,
100        token_path: &std::path::Path,
101        account_id: &str,
102    ) -> Result<String, XApiError> {
103        // Fast path: token manager already exists.
104        {
105            let managers = self.token_managers.lock().await;
106            if let Some(tm) = managers.get(account_id) {
107                return tm.get_access_token().await;
108            }
109        }
110
111        // Load tokens from disk and create a new manager.
112        let tokens = auth::load_tokens(token_path)?.ok_or(XApiError::AuthExpired)?;
113
114        let tm = Arc::new(TokenManager::new(
115            tokens,
116            self.x_client_id.clone(),
117            token_path.to_path_buf(),
118        ));
119
120        let access_token = tm.get_access_token().await?;
121
122        self.token_managers
123            .lock()
124            .await
125            .insert(account_id.to_string(), tm);
126
127        Ok(access_token)
128    }
129
130    /// Load the effective config for a given account.
131    ///
132    /// Default account: reads config.toml directly (backward compat).
133    /// Non-default: merges config.toml base with account's `config_overrides` from DB.
134    pub async fn load_effective_config(&self, account_id: &str) -> Result<Config, String> {
135        let contents = std::fs::read_to_string(&self.config_path).unwrap_or_default();
136        let base: Config = toml::from_str(&contents).unwrap_or_default();
137
138        if account_id == DEFAULT_ACCOUNT_ID {
139            return Ok(base);
140        }
141
142        let account = accounts::get_account(&self.db, account_id)
143            .await
144            .map_err(|e| e.to_string())?
145            .ok_or_else(|| format!("account not found: {account_id}"))?;
146
147        effective_config(&base, &account.config_overrides)
148            .map(|r| r.config)
149            .map_err(|e| e.to_string())
150    }
151
152    /// Lazily create or return a cached `ContentGenerator` for the given account.
153    ///
154    /// Loads effective config, creates the LLM provider, and caches the generator.
155    pub async fn get_or_create_content_generator(
156        &self,
157        account_id: &str,
158    ) -> Result<Arc<ContentGenerator>, String> {
159        // Fast path: already cached.
160        {
161            let generators = self.content_generators.lock().await;
162            if let Some(gen) = generators.get(account_id) {
163                return Ok(gen.clone());
164            }
165        }
166
167        let config = self.load_effective_config(account_id).await?;
168
169        let provider =
170            create_provider(&config.llm).map_err(|e| format!("LLM not configured: {e}"))?;
171
172        let gen = Arc::new(ContentGenerator::new(provider, config.business));
173
174        self.content_generators
175            .lock()
176            .await
177            .insert(account_id.to_string(), gen.clone());
178
179        Ok(gen)
180    }
181
182    /// Returns `true` if the current deployment mode is local-first (Desktop).
183    pub fn is_local_first(&self) -> bool {
184        self.deployment_mode.is_local_first()
185    }
186
187    /// Cancel the running Watchtower (if any), reload config from disk,
188    /// and spawn a new Watchtower loop with the updated sources.
189    ///
190    /// Called after `PATCH /api/settings` modifies `content_sources` or
191    /// `deployment_mode`.
192    pub async fn restart_watchtower(&self) {
193        // 1. Cancel existing watchtower.
194        if let Some(cancel) = self.watchtower_cancel.write().await.take() {
195            cancel.cancel();
196            tracing::info!("Watchtower cancelled for config reload");
197        }
198
199        // 2. Reload config from disk.
200        let loaded_config = Config::load(Some(&self.config_path.to_string_lossy())).ok();
201        let new_sources = loaded_config
202            .as_ref()
203            .map(|c| c.content_sources.clone())
204            .unwrap_or_default();
205        let connector_config = loaded_config
206            .as_ref()
207            .map(|c| c.connectors.clone())
208            .unwrap_or_default();
209        let deployment_mode = loaded_config
210            .as_ref()
211            .map(|c| c.deployment_mode.clone())
212            .unwrap_or_default();
213
214        // 3. Check if any sources are enabled and eligible.
215        let has_enabled: Vec<_> = new_sources
216            .sources
217            .iter()
218            .filter(|s| {
219                s.is_enabled()
220                    && deployment_mode.allows_source_type(&s.source_type)
221                    && (s.path.is_some() || s.folder_id.is_some())
222            })
223            .collect();
224
225        if has_enabled.is_empty() {
226            tracing::info!("Watchtower restart: no enabled sources, not spawning");
227            *self.content_sources.write().await = new_sources;
228            return;
229        }
230
231        // Surface privacy envelope for operators: local_fs in non-Desktop mode
232        // means data is user-controlled but not same-machine local-first.
233        if !deployment_mode.is_local_first()
234            && has_enabled.iter().any(|s| s.source_type == "local_fs")
235        {
236            tracing::info!(
237                mode = %deployment_mode,
238                "local_fs source in {} mode — data is user-controlled but not local-first",
239                deployment_mode
240            );
241        }
242
243        // 4. Spawn new WatchtowerLoop.
244        let cancel = CancellationToken::new();
245        let watchtower = WatchtowerLoop::new(
246            self.db.clone(),
247            new_sources.clone(),
248            connector_config,
249            self.data_dir.clone(),
250        );
251        let cancel_clone = cancel.clone();
252        tokio::spawn(async move {
253            watchtower.run(cancel_clone).await;
254        });
255
256        tracing::info!(
257            sources = has_enabled.len(),
258            "Watchtower restarted with updated config"
259        );
260
261        // 5. Update state.
262        *self.watchtower_cancel.write().await = Some(cancel);
263        *self.content_sources.write().await = new_sources;
264    }
265}