Skip to main content

localgpt_server/security/
bridge.rs

1use anyhow::Result;
2use chacha20poly1305::{
3    ChaCha20Poly1305, Key, Nonce,
4    aead::{Aead, KeyInit},
5};
6use chrono::{DateTime, Utc};
7use hmac::{Hmac, Mac};
8use localgpt_bridge::peer_identity::{PeerIdentity, get_peer_identity};
9use localgpt_bridge::{BridgeError, BridgeServer, BridgeService};
10use rand::RngExt;
11use serde::Serialize;
12use sha2::Sha256;
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::Duration;
16use tarpc::context;
17use tokio::sync::RwLock;
18use tracing::{debug, error, info, warn};
19use uuid::Uuid;
20
21use localgpt_core::agent::{Agent, AgentConfig};
22use localgpt_core::config::Config;
23use localgpt_core::memory::MemoryManager;
24use localgpt_core::paths::Paths;
25use localgpt_core::security::read_device_key;
26
27/// Agent ID used for bridge CLI sessions.
28const BRIDGE_CLI_AGENT_ID: &str = "bridge-cli";
29
30/// Health status of a bridge connection
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
32#[serde(rename_all = "snake_case")]
33pub enum HealthStatus {
34    /// Bridge is actively communicating
35    Healthy,
36    /// Bridge hasn't been seen recently (warning)
37    Degraded,
38    /// Bridge is unresponsive (critical)
39    Unhealthy,
40}
41
42/// Status and health info for a connected bridge.
43#[derive(Debug, Clone, Serialize)]
44pub struct BridgeStatus {
45    pub connection_id: String,
46    pub bridge_id: Option<String>,
47    pub connected_at: DateTime<Utc>,
48    pub last_active: DateTime<Utc>,
49    pub pid: Option<i32>,
50    pub uid: Option<u32>,
51    /// Current health status based on last_active time
52    pub health: HealthStatus,
53    /// Number of consecutive health check failures
54    pub consecutive_failures: u32,
55}
56
57/// Configuration for bridge health monitoring
58#[derive(Debug, Clone)]
59pub struct HealthCheckConfig {
60    /// How often to check bridge health (default: 30s)
61    pub check_interval: Duration,
62    /// Time without activity before marking as degraded (default: 60s)
63    pub degraded_threshold: Duration,
64    /// Time without activity before marking as unhealthy (default: 120s)
65    pub unhealthy_threshold: Duration,
66    /// Whether to log warnings for unhealthy bridges
67    pub log_warnings: bool,
68}
69
70impl Default for HealthCheckConfig {
71    fn default() -> Self {
72        Self {
73            check_interval: Duration::from_secs(30),
74            degraded_threshold: Duration::from_secs(60),
75            unhealthy_threshold: Duration::from_secs(120),
76            log_warnings: true,
77        }
78    }
79}
80
81/// Shared agent session for bridge CLI connections.
82struct AgentSession {
83    agent: Agent,
84}
85
86/// Optional agent support for handling chat/memory RPCs.
87struct AgentSupport {
88    config: Config,
89    memory: Arc<MemoryManager>,
90    sessions: tokio::sync::Mutex<HashMap<String, AgentSession>>,
91}
92
93/// Manages bridge processes and their credentials.
94#[derive(Clone)]
95pub struct BridgeManager {
96    // In-memory cache of decrypted credentials
97    credentials: Arc<RwLock<HashMap<String, Vec<u8>>>>,
98    // Active connections: connection_id -> info
99    active_bridges: Arc<RwLock<HashMap<String, BridgeStatus>>>,
100    // Optional agent support for CLI bridge
101    agent_support: Option<Arc<AgentSupport>>,
102    // Health check configuration
103    health_config: HealthCheckConfig,
104}
105
106impl BridgeManager {
107    pub fn new() -> Self {
108        Self {
109            credentials: Arc::new(RwLock::new(HashMap::new())),
110            active_bridges: Arc::new(RwLock::new(HashMap::new())),
111            agent_support: None,
112            health_config: HealthCheckConfig::default(),
113        }
114    }
115
116    /// Create a BridgeManager with agent support for handling chat/memory RPCs.
117    /// This is used by the daemon when serving bridge CLI connections.
118    pub fn new_with_agent_support(config: Config, memory: MemoryManager) -> Self {
119        Self {
120            credentials: Arc::new(RwLock::new(HashMap::new())),
121            active_bridges: Arc::new(RwLock::new(HashMap::new())),
122            agent_support: Some(Arc::new(AgentSupport {
123                config,
124                memory: Arc::new(memory),
125                sessions: tokio::sync::Mutex::new(HashMap::new()),
126            })),
127            health_config: HealthCheckConfig::default(),
128        }
129    }
130
131    /// Create with custom health check configuration
132    pub fn with_health_config(config: HealthCheckConfig) -> Self {
133        Self {
134            credentials: Arc::new(RwLock::new(HashMap::new())),
135            active_bridges: Arc::new(RwLock::new(HashMap::new())),
136            agent_support: None,
137            health_config: config,
138        }
139    }
140
141    /// Start the background health check task
142    pub fn start_health_checker(&self) -> tokio::task::JoinHandle<()> {
143        let manager = self.clone();
144        let interval = self.health_config.check_interval;
145
146        tokio::spawn(async move {
147            let mut timer = tokio::time::interval(interval);
148            loop {
149                timer.tick().await;
150                manager.check_bridge_health().await;
151            }
152        })
153    }
154
155    /// Check health of all bridges and update their status
156    async fn check_bridge_health(&self) {
157        let now = Utc::now();
158        let config = &self.health_config;
159        let mut bridges = self.active_bridges.write().await;
160
161        for (_id, status) in bridges.iter_mut() {
162            let elapsed = (now - status.last_active)
163                .to_std()
164                .unwrap_or(Duration::ZERO);
165
166            let previous_health = status.health;
167            let previous_failures = status.consecutive_failures;
168
169            // Determine health based on elapsed time since last activity
170            if elapsed > config.unhealthy_threshold {
171                status.health = HealthStatus::Unhealthy;
172                status.consecutive_failures += 1;
173            } else if elapsed > config.degraded_threshold {
174                status.health = HealthStatus::Degraded;
175                status.consecutive_failures += 1;
176            } else {
177                status.health = HealthStatus::Healthy;
178                status.consecutive_failures = 0;
179            }
180
181            // Log warnings on state changes or continued unhealthy state
182            if config.log_warnings {
183                if status.health != previous_health {
184                    match status.health {
185                        HealthStatus::Degraded => {
186                            warn!(
187                                "Bridge {} (connection {}) is degraded - no activity for {:?}",
188                                status.bridge_id.as_deref().unwrap_or("unknown"),
189                                status.connection_id,
190                                elapsed
191                            );
192                        }
193                        HealthStatus::Unhealthy => {
194                            error!(
195                                "Bridge {} (connection {}) is unhealthy - no activity for {:?}",
196                                status.bridge_id.as_deref().unwrap_or("unknown"),
197                                status.connection_id,
198                                elapsed
199                            );
200                        }
201                        HealthStatus::Healthy => {
202                            info!(
203                                "Bridge {} (connection {}) is now healthy",
204                                status.bridge_id.as_deref().unwrap_or("unknown"),
205                                status.connection_id
206                            );
207                        }
208                    }
209                } else if status.health == HealthStatus::Unhealthy
210                    && status.consecutive_failures > previous_failures
211                    && status.consecutive_failures % 3 == 0
212                {
213                    // Log every 3rd consecutive failure
214                    error!(
215                        "Bridge {} (connection {}) still unhealthy (failures: {})",
216                        status.bridge_id.as_deref().unwrap_or("unknown"),
217                        status.connection_id,
218                        status.consecutive_failures
219                    );
220                }
221            }
222        }
223    }
224
225    /// Return status of all active bridge connections.
226    pub async fn get_active_bridges(&self) -> Vec<BridgeStatus> {
227        self.active_bridges.read().await.values().cloned().collect()
228    }
229
230    async fn add_connection(&self, id: &str, identity: &PeerIdentity) {
231        let status = BridgeStatus {
232            connection_id: id.to_string(),
233            bridge_id: None,
234            connected_at: Utc::now(),
235            last_active: Utc::now(),
236            pid: identity.pid,
237            uid: identity.uid,
238            health: HealthStatus::Healthy,
239            consecutive_failures: 0,
240        };
241        self.active_bridges
242            .write()
243            .await
244            .insert(id.to_string(), status);
245    }
246
247    async fn update_active(&self, id: &str, bridge_id: Option<String>) {
248        let mut active = self.active_bridges.write().await;
249        if let Some(status) = active.get_mut(id) {
250            status.last_active = Utc::now();
251            status.health = HealthStatus::Healthy;
252            status.consecutive_failures = 0;
253            if bridge_id.is_some() {
254                status.bridge_id = bridge_id;
255            }
256        }
257    }
258
259    async fn remove_connection(&self, id: &str) {
260        self.active_bridges.write().await.remove(id);
261    }
262
263    /// Register a new bridge secret.
264    /// Encrypts and saves to disk, and updates cache.
265    pub async fn register_bridge(&self, bridge_id: &str, secret: &[u8]) -> Result<()> {
266        validate_bridge_id(bridge_id)?;
267
268        let paths = Paths::resolve()?;
269        let bridges_dir = paths.data_dir.join("bridges");
270        std::fs::create_dir_all(&bridges_dir)?;
271
272        // 1. Get Master Key
273        let master_key = read_device_key(&paths.data_dir)?;
274
275        // 2. Derive Bridge Key = HMAC-SHA256(MasterKey, "bridge-key:" + bridge_id)
276        let bridge_key = derive_bridge_key(&master_key, bridge_id)?;
277
278        // 3. Encrypt Secret
279        let cipher = ChaCha20Poly1305::new(&bridge_key);
280
281        // Generate nonce manually to avoid rand_core version mismatch
282        let mut nonce_bytes = [0u8; 12];
283        rand::rng().fill(&mut nonce_bytes);
284        let nonce = Nonce::from_slice(&nonce_bytes);
285
286        let ciphertext = cipher
287            .encrypt(nonce, secret)
288            .map_err(|e| anyhow::anyhow!("Encryption failed: {}", e))?;
289
290        // 4. Save to file: [Nonce (12 bytes)][Ciphertext]
291        let mut file_content = nonce_bytes.to_vec();
292        file_content.extend(ciphertext);
293
294        let file_path = bridges_dir.join(format!("{}.enc", bridge_id));
295        std::fs::write(&file_path, file_content)?;
296
297        #[cfg(unix)]
298        {
299            use std::os::unix::fs::PermissionsExt;
300            std::fs::set_permissions(&file_path, std::fs::Permissions::from_mode(0o600))?;
301        }
302
303        // 5. Update Cache
304        let mut creds = self.credentials.write().await;
305        creds.insert(bridge_id.to_string(), secret.to_vec());
306
307        info!("Registered credentials for bridge: {}", bridge_id);
308        Ok(())
309    }
310
311    /// Retrieve credentials if the identity is authorized.
312    /// Loads from disk if not in cache.
313    pub async fn get_credentials_for(
314        &self,
315        bridge_id: &str,
316        identity: &PeerIdentity,
317    ) -> Result<Vec<u8>, BridgeError> {
318        if let Err(e) = validate_bridge_id(bridge_id) {
319            error!("Invalid bridge ID: {}", e);
320            return Err(BridgeError::AuthFailed("Invalid bridge ID".to_string()));
321        }
322
323        // Verify identity (Basic check for now)
324        // TODO: Implement stricter checks based on OS user or code signature
325        info!(
326            "Checking access for bridge: {} from {:?}",
327            bridge_id, identity
328        );
329
330        // Check cache first
331        {
332            let creds = self.credentials.read().await;
333            if let Some(secret) = creds.get(bridge_id) {
334                return Ok(secret.clone());
335            }
336        }
337
338        // Load from disk
339        match self.load_credentials_from_disk(bridge_id).await {
340            Ok(secret) => {
341                // Cache it
342                let mut creds = self.credentials.write().await;
343                creds.insert(bridge_id.to_string(), secret.clone());
344                Ok(secret)
345            }
346            Err(e) => {
347                error!("Failed to load credentials for {}: {}", bridge_id, e);
348                Err(BridgeError::NotRegistered)
349            }
350        }
351    }
352
353    async fn load_credentials_from_disk(&self, bridge_id: &str) -> Result<Vec<u8>> {
354        let paths = Paths::resolve()?;
355        let file_path = paths
356            .data_dir
357            .join("bridges")
358            .join(format!("{}.enc", bridge_id));
359
360        if !file_path.exists() {
361            anyhow::bail!("Credential file not found");
362        }
363
364        let file_content = std::fs::read(&file_path)?;
365        if file_content.len() < 12 {
366            anyhow::bail!("Invalid credential file format (too short)");
367        }
368
369        let (nonce_bytes, ciphertext) = file_content.split_at(12);
370        let nonce = Nonce::from_slice(nonce_bytes);
371
372        // Derive Key
373        let master_key = read_device_key(&paths.data_dir)?;
374        let bridge_key = derive_bridge_key(&master_key, bridge_id)?;
375
376        // Decrypt
377        let cipher = ChaCha20Poly1305::new(&bridge_key);
378        let plaintext = cipher
379            .decrypt(nonce, ciphertext)
380            .map_err(|e| anyhow::anyhow!("Decryption failed: {}", e))?;
381
382        Ok(plaintext)
383    }
384
385    /// Start the bridge server listening on the given socket path.
386    pub async fn serve(self, socket_path: &str) -> anyhow::Result<()> {
387        let listener = BridgeServer::bind(socket_path)?;
388        let manager = self.clone();
389
390        info!("BridgeManager listening on {}", socket_path);
391
392        loop {
393            let conn = match listener.accept().await {
394                Ok(c) => c,
395                Err(e) => {
396                    error!("Accept failed: {}", e);
397                    continue;
398                }
399            };
400
401            // Verify peer identity
402            let identity_result = {
403                #[cfg(unix)]
404                {
405                    get_peer_identity(&conn)
406                }
407                #[cfg(windows)]
408                {
409                    get_peer_identity(&conn)
410                }
411            };
412
413            let identity = match identity_result {
414                Ok(id) => {
415                    // Enforce UID matching (same-user only)
416                    #[cfg(unix)]
417                    {
418                        // SAFETY: getuid() is always safe to call — it has no arguments,
419                        // requires no preconditions, and simply returns the real user ID.
420                        let current_uid = unsafe { libc::getuid() };
421                        if let Some(peer_uid) = id.uid.filter(|&uid| uid != current_uid) {
422                            error!(
423                                "Rejected connection from UID {} (expected {})",
424                                peer_uid, current_uid
425                            );
426                            continue;
427                        }
428                    }
429                    id
430                }
431                Err(e) => {
432                    error!("Peer identity verification failed: {}", e);
433                    continue;
434                }
435            };
436
437            info!("Accepted connection from: {:?}", identity);
438
439            let connection_id = Uuid::new_v4().to_string();
440            manager.add_connection(&connection_id, &identity).await;
441
442            let handler = ConnectionHandler {
443                manager: manager.clone(),
444                identity,
445                connection_id: connection_id.clone(),
446            };
447
448            let connection_manager = manager.clone();
449            tokio::spawn(async move {
450                if let Err(e) = localgpt_bridge::handle_connection(conn, handler).await {
451                    debug!("Connection handling finished/error: {:?}", e);
452                }
453                connection_manager.remove_connection(&connection_id).await;
454            });
455        }
456    }
457}
458
459impl Default for BridgeManager {
460    fn default() -> Self {
461        Self::new()
462    }
463}
464
465fn derive_bridge_key(master_key: &[u8; 32], bridge_id: &str) -> Result<Key> {
466    type HmacSha256 = Hmac<Sha256>;
467    // Disambiguate Mac vs KeyInit
468    let mut mac = <HmacSha256 as Mac>::new_from_slice(master_key)
469        .map_err(|e| anyhow::anyhow!("HMAC init failed: {}", e))?;
470
471    mac.update(b"bridge-key:");
472    mac.update(bridge_id.as_bytes());
473
474    let result = mac.finalize().into_bytes();
475    // ChaCha20Poly1305 key is 32 bytes, which matches SHA256 output size.
476    Ok(*Key::from_slice(&result))
477}
478
479/// Per-connection handler that implements the BridgeService trait.
480#[derive(Clone)]
481struct ConnectionHandler {
482    manager: BridgeManager,
483    identity: PeerIdentity,
484    connection_id: String,
485}
486
487impl BridgeService for ConnectionHandler {
488    async fn get_version(self, _: context::Context) -> String {
489        self.manager.update_active(&self.connection_id, None).await;
490        localgpt_bridge::BRIDGE_PROTOCOL_VERSION.to_string()
491    }
492
493    async fn ping(self, _: context::Context) -> bool {
494        self.manager.update_active(&self.connection_id, None).await;
495        true
496    }
497
498    async fn get_credentials(
499        self,
500        _: context::Context,
501        bridge_id: String,
502    ) -> Result<Vec<u8>, BridgeError> {
503        self.manager
504            .update_active(&self.connection_id, Some(bridge_id.clone()))
505            .await;
506        self.manager
507            .get_credentials_for(&bridge_id, &self.identity)
508            .await
509    }
510
511    async fn chat(
512        self,
513        _: context::Context,
514        session_id: String,
515        message: String,
516    ) -> Result<String, BridgeError> {
517        self.manager.update_active(&self.connection_id, None).await;
518        let support = self
519            .manager
520            .agent_support
521            .as_ref()
522            .ok_or_else(|| BridgeError::NotSupported("Agent support not available".into()))?;
523
524        let mut sessions = support.sessions.lock().await;
525
526        // Create session if it doesn't exist, using entry API to avoid unwrap
527        if let std::collections::hash_map::Entry::Vacant(entry) = sessions.entry(session_id.clone())
528        {
529            let agent_config = AgentConfig {
530                model: support.config.agent.default_model.clone(),
531                context_window: support.config.agent.context_window,
532                reserve_tokens: support.config.agent.reserve_tokens,
533            };
534            let mut agent = Agent::new(agent_config, &support.config, Arc::clone(&support.memory))
535                .await
536                .map_err(|e| BridgeError::Internal(format!("Failed to create agent: {}", e)))?;
537            agent
538                .new_session()
539                .await
540                .map_err(|e| BridgeError::Internal(format!("Failed to init session: {}", e)))?;
541            entry.insert(AgentSession { agent });
542        }
543
544        let session = sessions
545            .get_mut(&session_id)
546            .ok_or_else(|| BridgeError::Internal("Session unexpectedly missing".into()))?;
547        let response = session
548            .agent
549            .chat(&message)
550            .await
551            .map_err(|e| BridgeError::Internal(format!("Chat error: {}", e)))?;
552
553        if let Err(e) = session
554            .agent
555            .save_session_for_agent(BRIDGE_CLI_AGENT_ID)
556            .await
557        {
558            warn!("Failed to save bridge-cli session: {}", e);
559        }
560
561        Ok(response)
562    }
563
564    async fn new_session(
565        self,
566        _: context::Context,
567        session_id: String,
568    ) -> Result<String, BridgeError> {
569        self.manager.update_active(&self.connection_id, None).await;
570        let support = self
571            .manager
572            .agent_support
573            .as_ref()
574            .ok_or_else(|| BridgeError::NotSupported("Agent support not available".into()))?;
575
576        let mut sessions = support.sessions.lock().await;
577
578        let agent_config = AgentConfig {
579            model: support.config.agent.default_model.clone(),
580            context_window: support.config.agent.context_window,
581            reserve_tokens: support.config.agent.reserve_tokens,
582        };
583        let mut agent = Agent::new(agent_config, &support.config, Arc::clone(&support.memory))
584            .await
585            .map_err(|e| BridgeError::Internal(format!("Failed to create agent: {}", e)))?;
586        agent
587            .new_session()
588            .await
589            .map_err(|e| BridgeError::Internal(format!("Failed to init session: {}", e)))?;
590
591        let model = agent.model().to_string();
592        let chunks = agent.memory_chunk_count();
593        sessions.insert(session_id, AgentSession { agent });
594
595        Ok(format!(
596            "New session created. Model: {} | Memory: {} chunks",
597            model, chunks
598        ))
599    }
600
601    async fn session_status(
602        self,
603        _: context::Context,
604        session_id: String,
605    ) -> Result<String, BridgeError> {
606        self.manager.update_active(&self.connection_id, None).await;
607        let support = self
608            .manager
609            .agent_support
610            .as_ref()
611            .ok_or_else(|| BridgeError::NotSupported("Agent support not available".into()))?;
612
613        let sessions = support.sessions.lock().await;
614        let session = sessions
615            .get(&session_id)
616            .ok_or_else(|| BridgeError::Internal("No active session".into()))?;
617
618        let status = session.agent.session_status();
619        let mut output = String::new();
620        output.push_str(&format!("Session ID: {}\n", status.id));
621        output.push_str(&format!("Model: {}\n", session.agent.model()));
622        output.push_str(&format!("Messages: {}\n", status.message_count));
623        output.push_str(&format!("Context tokens: ~{}\n", status.token_count));
624        output.push_str(&format!("Compactions: {}\n", status.compaction_count));
625        output.push_str(&format!(
626            "Memory chunks: {}",
627            session.agent.memory_chunk_count()
628        ));
629
630        if status.api_input_tokens > 0 || status.api_output_tokens > 0 {
631            output.push_str(&format!(
632                "\nAPI tokens: {} in / {} out",
633                status.api_input_tokens, status.api_output_tokens
634            ));
635        }
636
637        Ok(output)
638    }
639
640    async fn set_model(
641        self,
642        _: context::Context,
643        session_id: String,
644        model: String,
645    ) -> Result<String, BridgeError> {
646        self.manager.update_active(&self.connection_id, None).await;
647        let support = self
648            .manager
649            .agent_support
650            .as_ref()
651            .ok_or_else(|| BridgeError::NotSupported("Agent support not available".into()))?;
652
653        let mut sessions = support.sessions.lock().await;
654        let session = sessions
655            .get_mut(&session_id)
656            .ok_or_else(|| BridgeError::Internal("No active session".into()))?;
657
658        session
659            .agent
660            .set_model(&model)
661            .map_err(|e| BridgeError::Internal(format!("Failed to set model: {}", e)))?;
662
663        Ok(format!("Switched to model: {}", model))
664    }
665
666    async fn compact_session(
667        self,
668        _: context::Context,
669        session_id: String,
670    ) -> Result<String, BridgeError> {
671        self.manager.update_active(&self.connection_id, None).await;
672        let support = self
673            .manager
674            .agent_support
675            .as_ref()
676            .ok_or_else(|| BridgeError::NotSupported("Agent support not available".into()))?;
677
678        let mut sessions = support.sessions.lock().await;
679        let session = sessions
680            .get_mut(&session_id)
681            .ok_or_else(|| BridgeError::Internal("No active session".into()))?;
682
683        let (before, after) = session
684            .agent
685            .compact_session()
686            .await
687            .map_err(|e| BridgeError::Internal(format!("Failed to compact: {}", e)))?;
688
689        Ok(format!(
690            "Session compacted. Token count: {} → {}",
691            before, after
692        ))
693    }
694
695    async fn clear_session(
696        self,
697        _: context::Context,
698        session_id: String,
699    ) -> Result<String, BridgeError> {
700        self.manager.update_active(&self.connection_id, None).await;
701        let support = self
702            .manager
703            .agent_support
704            .as_ref()
705            .ok_or_else(|| BridgeError::NotSupported("Agent support not available".into()))?;
706
707        let mut sessions = support.sessions.lock().await;
708        let session = sessions
709            .get_mut(&session_id)
710            .ok_or_else(|| BridgeError::Internal("No active session".into()))?;
711
712        session.agent.clear_session();
713        Ok("Session cleared.".into())
714    }
715
716    async fn memory_search(
717        self,
718        _: context::Context,
719        query: String,
720        limit: u32,
721    ) -> Result<String, BridgeError> {
722        self.manager.update_active(&self.connection_id, None).await;
723        let support = self
724            .manager
725            .agent_support
726            .as_ref()
727            .ok_or_else(|| BridgeError::NotSupported("Agent support not available".into()))?;
728
729        let results = support
730            .memory
731            .search(&query, limit as usize)
732            .map_err(|e| BridgeError::Internal(format!("Memory search failed: {}", e)))?;
733
734        if results.is_empty() {
735            return Ok(format!("No results found for '{}'", query));
736        }
737
738        let mut output = format!("Found {} results for '{}':\n", results.len(), query);
739        for (i, result) in results.iter().enumerate() {
740            output.push_str(&format!(
741                "\n{}. {} (lines {}-{})\n",
742                i + 1,
743                result.file,
744                result.line_start,
745                result.line_end
746            ));
747            output.push_str(&format!("   Score: {:.3}\n", result.score));
748            let preview: String = result.content.chars().take(200).collect();
749            let preview = preview.replace('\n', " ");
750            output.push_str(&format!(
751                "   {}{}\n",
752                preview,
753                if result.content.len() > 200 {
754                    "..."
755                } else {
756                    ""
757                }
758            ));
759        }
760
761        Ok(output)
762    }
763
764    async fn memory_stats(self, _: context::Context) -> Result<String, BridgeError> {
765        self.manager.update_active(&self.connection_id, None).await;
766        let support = self
767            .manager
768            .agent_support
769            .as_ref()
770            .ok_or_else(|| BridgeError::NotSupported("Agent support not available".into()))?;
771
772        let stats = support
773            .memory
774            .stats()
775            .map_err(|e| BridgeError::Internal(format!("Failed to get stats: {}", e)))?;
776
777        let mut output = String::new();
778        output.push_str("Memory Statistics\n");
779        output.push_str("-----------------\n");
780        output.push_str(&format!("Workspace: {}\n", stats.workspace));
781        output.push_str(&format!("Total files: {}\n", stats.total_files));
782        output.push_str(&format!("Total chunks: {}\n", stats.total_chunks));
783        output.push_str(&format!("Index size: {} KB\n", stats.index_size_kb));
784        output.push_str("\nFiles:\n");
785        for file in &stats.files {
786            output.push_str(&format!(
787                "  {} ({} chunks, {} lines)\n",
788                file.name, file.chunks, file.lines
789            ));
790        }
791
792        Ok(output)
793    }
794}
795
796fn validate_bridge_id(id: &str) -> Result<()> {
797    if id.is_empty() {
798        anyhow::bail!("Bridge ID cannot be empty");
799    }
800    if id.len() > 64 {
801        anyhow::bail!("Bridge ID is too long (max 64 chars)");
802    }
803    if !id
804        .chars()
805        .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
806    {
807        anyhow::bail!("Bridge ID contains invalid characters. Allowed: a-z, A-Z, 0-9, -, _");
808    }
809    Ok(())
810}
811
812#[cfg(test)]
813mod tests {
814    use super::*;
815
816    #[test]
817    fn test_health_status_serialization() {
818        let healthy = HealthStatus::Healthy;
819        assert_eq!(serde_json::to_string(&healthy).unwrap(), "\"healthy\"");
820
821        let degraded = HealthStatus::Degraded;
822        assert_eq!(serde_json::to_string(&degraded).unwrap(), "\"degraded\"");
823
824        let unhealthy = HealthStatus::Unhealthy;
825        assert_eq!(serde_json::to_string(&unhealthy).unwrap(), "\"unhealthy\"");
826    }
827
828    #[test]
829    fn test_health_check_config_default() {
830        let config = HealthCheckConfig::default();
831        assert_eq!(config.check_interval, Duration::from_secs(30));
832        assert_eq!(config.degraded_threshold, Duration::from_secs(60));
833        assert_eq!(config.unhealthy_threshold, Duration::from_secs(120));
834        assert!(config.log_warnings);
835    }
836
837    #[tokio::test]
838    async fn test_bridge_status_initial_health() {
839        let manager = BridgeManager::new();
840        let identity = PeerIdentity {
841            pid: Some(1234),
842            uid: Some(1000),
843            gid: Some(1000),
844        };
845
846        manager.add_connection("test-conn", &identity).await;
847
848        let bridges = manager.get_active_bridges().await;
849        assert_eq!(bridges.len(), 1);
850        assert_eq!(bridges[0].health, HealthStatus::Healthy);
851        assert_eq!(bridges[0].consecutive_failures, 0);
852    }
853
854    #[tokio::test]
855    async fn test_update_active_resets_health() {
856        let manager = BridgeManager::new();
857        let identity = PeerIdentity {
858            pid: Some(1234),
859            uid: Some(1000),
860            gid: Some(1000),
861        };
862
863        manager.add_connection("test-conn", &identity).await;
864
865        // Simulate bridge going unhealthy
866        {
867            let mut bridges = manager.active_bridges.write().await;
868            let status = bridges.get_mut("test-conn").unwrap();
869            status.health = HealthStatus::Unhealthy;
870            status.consecutive_failures = 5;
871        }
872
873        // Update active should reset health
874        manager
875            .update_active("test-conn", Some("telegram".to_string()))
876            .await;
877
878        let bridges = manager.get_active_bridges().await;
879        assert_eq!(bridges[0].health, HealthStatus::Healthy);
880        assert_eq!(bridges[0].consecutive_failures, 0);
881        assert_eq!(bridges[0].bridge_id, Some("telegram".to_string()));
882    }
883
884    #[tokio::test]
885    async fn test_health_check_degraded() {
886        let config = HealthCheckConfig {
887            check_interval: Duration::from_secs(30),
888            degraded_threshold: Duration::from_secs(5),
889            unhealthy_threshold: Duration::from_secs(10),
890            log_warnings: false,
891        };
892        let manager = BridgeManager::with_health_config(config);
893        let identity = PeerIdentity {
894            pid: Some(1234),
895            uid: Some(1000),
896            gid: Some(1000),
897        };
898
899        manager.add_connection("test-conn", &identity).await;
900
901        // Simulate time passing by setting last_active to the past
902        {
903            let mut bridges = manager.active_bridges.write().await;
904            let status = bridges.get_mut("test-conn").unwrap();
905            // Set last_active to 7 seconds ago (past degraded threshold of 5s)
906            status.last_active = Utc::now() - chrono::Duration::seconds(7);
907        }
908
909        // Run health check
910        manager.check_bridge_health().await;
911
912        let bridges = manager.get_active_bridges().await;
913        assert_eq!(bridges[0].health, HealthStatus::Degraded);
914        assert_eq!(bridges[0].consecutive_failures, 1);
915    }
916
917    #[tokio::test]
918    async fn test_health_check_unhealthy() {
919        let config = HealthCheckConfig {
920            check_interval: Duration::from_secs(30),
921            degraded_threshold: Duration::from_secs(5),
922            unhealthy_threshold: Duration::from_secs(10),
923            log_warnings: false,
924        };
925        let manager = BridgeManager::with_health_config(config);
926        let identity = PeerIdentity {
927            pid: Some(1234),
928            uid: Some(1000),
929            gid: Some(1000),
930        };
931
932        manager.add_connection("test-conn", &identity).await;
933
934        // Simulate time passing by setting last_active to the past
935        {
936            let mut bridges = manager.active_bridges.write().await;
937            let status = bridges.get_mut("test-conn").unwrap();
938            // Set last_active to 15 seconds ago (past unhealthy threshold of 10s)
939            status.last_active = Utc::now() - chrono::Duration::seconds(15);
940        }
941
942        // Run health check
943        manager.check_bridge_health().await;
944
945        let bridges = manager.get_active_bridges().await;
946        assert_eq!(bridges[0].health, HealthStatus::Unhealthy);
947        assert_eq!(bridges[0].consecutive_failures, 1);
948    }
949
950    #[tokio::test]
951    async fn test_health_check_consecutive_failures() {
952        let config = HealthCheckConfig {
953            check_interval: Duration::from_secs(30),
954            degraded_threshold: Duration::from_secs(5),
955            unhealthy_threshold: Duration::from_secs(10),
956            log_warnings: false,
957        };
958        let manager = BridgeManager::with_health_config(config);
959        let identity = PeerIdentity {
960            pid: Some(1234),
961            uid: Some(1000),
962            gid: Some(1000),
963        };
964
965        manager.add_connection("test-conn", &identity).await;
966
967        // Simulate bridge that stays unhealthy
968        {
969            let mut bridges = manager.active_bridges.write().await;
970            let status = bridges.get_mut("test-conn").unwrap();
971            status.last_active = Utc::now() - chrono::Duration::seconds(15);
972        }
973
974        // Run health check 3 times
975        manager.check_bridge_health().await;
976        manager.check_bridge_health().await;
977        manager.check_bridge_health().await;
978
979        let bridges = manager.get_active_bridges().await;
980        assert_eq!(bridges[0].consecutive_failures, 3);
981    }
982
983    #[tokio::test]
984    async fn test_health_check_healthy_resets_failures() {
985        let config = HealthCheckConfig {
986            check_interval: Duration::from_secs(30),
987            degraded_threshold: Duration::from_secs(5),
988            unhealthy_threshold: Duration::from_secs(10),
989            log_warnings: false,
990        };
991        let manager = BridgeManager::with_health_config(config);
992        let identity = PeerIdentity {
993            pid: Some(1234),
994            uid: Some(1000),
995            gid: Some(1000),
996        };
997
998        manager.add_connection("test-conn", &identity).await;
999
1000        // Start with some failures
1001        {
1002            let mut bridges = manager.active_bridges.write().await;
1003            let status = bridges.get_mut("test-conn").unwrap();
1004            status.consecutive_failures = 5;
1005            status.health = HealthStatus::Unhealthy;
1006        }
1007
1008        // Bridge becomes active again (last_active is now)
1009        // Run health check - should reset to healthy
1010        manager.check_bridge_health().await;
1011
1012        let bridges = manager.get_active_bridges().await;
1013        assert_eq!(bridges[0].health, HealthStatus::Healthy);
1014        assert_eq!(bridges[0].consecutive_failures, 0);
1015    }
1016
1017    #[tokio::test]
1018    async fn test_remove_connection() {
1019        let manager = BridgeManager::new();
1020        let identity = PeerIdentity {
1021            pid: Some(1234),
1022            uid: Some(1000),
1023            gid: Some(1000),
1024        };
1025
1026        manager.add_connection("test-conn", &identity).await;
1027        assert_eq!(manager.get_active_bridges().await.len(), 1);
1028
1029        manager.remove_connection("test-conn").await;
1030        assert_eq!(manager.get_active_bridges().await.len(), 0);
1031    }
1032
1033    #[test]
1034    fn test_validate_bridge_id() {
1035        assert!(validate_bridge_id("telegram").is_ok());
1036        assert!(validate_bridge_id("discord-bot").is_ok());
1037        assert!(validate_bridge_id("whatsapp_2").is_ok());
1038        assert!(validate_bridge_id("bridge123").is_ok());
1039
1040        assert!(validate_bridge_id("").is_err());
1041        assert!(validate_bridge_id(&"x".repeat(65)).is_err());
1042        assert!(validate_bridge_id("bridge!@#").is_err());
1043        assert!(validate_bridge_id("bridge name").is_err());
1044    }
1045}