Skip to main content

localgpt_server/security/
bridge.rs

1use anyhow::Result;
2use chacha20poly1305::{
3    ChaCha20Poly1305, Key, Nonce,
4    aead::{Aead, KeyInit},
5};
6use chrono::{DateTime, Utc};
7use hmac::{Hmac, Mac};
8use localgpt_bridge::peer_identity::{PeerIdentity, get_peer_identity};
9use localgpt_bridge::{BridgeError, BridgeServer, BridgeService};
10use rand::RngExt;
11use serde::Serialize;
12use sha2::Sha256;
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::Duration;
16use tarpc::context;
17use tokio::sync::RwLock;
18use tracing::{debug, error, info, warn};
19use uuid::Uuid;
20
21use localgpt_core::agent::{Agent, AgentConfig};
22use localgpt_core::config::Config;
23use localgpt_core::memory::MemoryManager;
24use localgpt_core::paths::Paths;
25use localgpt_core::security::read_device_key;
26
27/// Agent ID used for bridge CLI sessions.
28const BRIDGE_CLI_AGENT_ID: &str = "bridge-cli";
29
30/// Health status of a bridge connection
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
32#[serde(rename_all = "snake_case")]
33pub enum HealthStatus {
34    /// Bridge is actively communicating
35    Healthy,
36    /// Bridge hasn't been seen recently (warning)
37    Degraded,
38    /// Bridge is unresponsive (critical)
39    Unhealthy,
40}
41
42/// Status and health info for a connected bridge.
43#[derive(Debug, Clone, Serialize)]
44pub struct BridgeStatus {
45    pub connection_id: String,
46    pub bridge_id: Option<String>,
47    pub connected_at: DateTime<Utc>,
48    pub last_active: DateTime<Utc>,
49    pub pid: Option<i32>,
50    pub uid: Option<u32>,
51    /// Current health status based on last_active time
52    pub health: HealthStatus,
53    /// Number of consecutive health check failures
54    pub consecutive_failures: u32,
55}
56
57/// Configuration for bridge health monitoring
58#[derive(Debug, Clone)]
59pub struct HealthCheckConfig {
60    /// How often to check bridge health (default: 30s)
61    pub check_interval: Duration,
62    /// Time without activity before marking as degraded (default: 60s)
63    pub degraded_threshold: Duration,
64    /// Time without activity before marking as unhealthy (default: 120s)
65    pub unhealthy_threshold: Duration,
66    /// Whether to log warnings for unhealthy bridges
67    pub log_warnings: bool,
68}
69
70impl Default for HealthCheckConfig {
71    fn default() -> Self {
72        Self {
73            check_interval: Duration::from_secs(30),
74            degraded_threshold: Duration::from_secs(60),
75            unhealthy_threshold: Duration::from_secs(120),
76            log_warnings: true,
77        }
78    }
79}
80
81/// Shared agent session for bridge CLI connections.
82struct AgentSession {
83    agent: Agent,
84}
85
86/// Optional agent support for handling chat/memory RPCs.
87struct AgentSupport {
88    config: Config,
89    memory: Arc<MemoryManager>,
90    sessions: tokio::sync::Mutex<HashMap<String, AgentSession>>,
91}
92
93/// Manages bridge processes and their credentials.
94#[derive(Clone)]
95pub struct BridgeManager {
96    // In-memory cache of decrypted credentials
97    credentials: Arc<RwLock<HashMap<String, Vec<u8>>>>,
98    // Active connections: connection_id -> info
99    active_bridges: Arc<RwLock<HashMap<String, BridgeStatus>>>,
100    // Optional agent support for CLI bridge
101    agent_support: Option<Arc<AgentSupport>>,
102    // Health check configuration
103    health_config: HealthCheckConfig,
104}
105
106impl BridgeManager {
107    pub fn new() -> Self {
108        Self {
109            credentials: Arc::new(RwLock::new(HashMap::new())),
110            active_bridges: Arc::new(RwLock::new(HashMap::new())),
111            agent_support: None,
112            health_config: HealthCheckConfig::default(),
113        }
114    }
115
116    /// Create a BridgeManager with agent support for handling chat/memory RPCs.
117    /// This is used by the daemon when serving bridge CLI connections.
118    pub fn new_with_agent_support(config: Config, memory: MemoryManager) -> Self {
119        Self {
120            credentials: Arc::new(RwLock::new(HashMap::new())),
121            active_bridges: Arc::new(RwLock::new(HashMap::new())),
122            agent_support: Some(Arc::new(AgentSupport {
123                config,
124                memory: Arc::new(memory),
125                sessions: tokio::sync::Mutex::new(HashMap::new()),
126            })),
127            health_config: HealthCheckConfig::default(),
128        }
129    }
130
131    /// Create with custom health check configuration
132    pub fn with_health_config(config: HealthCheckConfig) -> Self {
133        Self {
134            credentials: Arc::new(RwLock::new(HashMap::new())),
135            active_bridges: Arc::new(RwLock::new(HashMap::new())),
136            agent_support: None,
137            health_config: config,
138        }
139    }
140
141    /// Start the background health check task
142    pub fn start_health_checker(&self) -> tokio::task::JoinHandle<()> {
143        let manager = self.clone();
144        let interval = self.health_config.check_interval;
145
146        tokio::spawn(async move {
147            let mut timer = tokio::time::interval(interval);
148            loop {
149                timer.tick().await;
150                manager.check_bridge_health().await;
151            }
152        })
153    }
154
155    /// Check health of all bridges and update their status
156    async fn check_bridge_health(&self) {
157        let now = Utc::now();
158        let config = &self.health_config;
159        let mut bridges = self.active_bridges.write().await;
160
161        for (_id, status) in bridges.iter_mut() {
162            let elapsed = (now - status.last_active)
163                .to_std()
164                .unwrap_or(Duration::ZERO);
165
166            let previous_health = status.health;
167            let previous_failures = status.consecutive_failures;
168
169            // Determine health based on elapsed time since last activity
170            if elapsed > config.unhealthy_threshold {
171                status.health = HealthStatus::Unhealthy;
172                status.consecutive_failures += 1;
173            } else if elapsed > config.degraded_threshold {
174                status.health = HealthStatus::Degraded;
175                status.consecutive_failures += 1;
176            } else {
177                status.health = HealthStatus::Healthy;
178                status.consecutive_failures = 0;
179            }
180
181            // Log warnings on state changes or continued unhealthy state
182            if config.log_warnings {
183                if status.health != previous_health {
184                    match status.health {
185                        HealthStatus::Degraded => {
186                            warn!(
187                                "Bridge {} (connection {}) is degraded - no activity for {:?}",
188                                status.bridge_id.as_deref().unwrap_or("unknown"),
189                                status.connection_id,
190                                elapsed
191                            );
192                        }
193                        HealthStatus::Unhealthy => {
194                            error!(
195                                "Bridge {} (connection {}) is unhealthy - no activity for {:?}",
196                                status.bridge_id.as_deref().unwrap_or("unknown"),
197                                status.connection_id,
198                                elapsed
199                            );
200                        }
201                        HealthStatus::Healthy => {
202                            info!(
203                                "Bridge {} (connection {}) is now healthy",
204                                status.bridge_id.as_deref().unwrap_or("unknown"),
205                                status.connection_id
206                            );
207                        }
208                    }
209                } else if status.health == HealthStatus::Unhealthy
210                    && status.consecutive_failures > previous_failures
211                    && status.consecutive_failures % 3 == 0
212                {
213                    // Log every 3rd consecutive failure
214                    error!(
215                        "Bridge {} (connection {}) still unhealthy (failures: {})",
216                        status.bridge_id.as_deref().unwrap_or("unknown"),
217                        status.connection_id,
218                        status.consecutive_failures
219                    );
220                }
221            }
222        }
223    }
224
225    /// Return status of all active bridge connections.
226    pub async fn get_active_bridges(&self) -> Vec<BridgeStatus> {
227        self.active_bridges.read().await.values().cloned().collect()
228    }
229
230    async fn add_connection(&self, id: &str, identity: &PeerIdentity) {
231        let status = BridgeStatus {
232            connection_id: id.to_string(),
233            bridge_id: None,
234            connected_at: Utc::now(),
235            last_active: Utc::now(),
236            pid: identity.pid,
237            uid: identity.uid,
238            health: HealthStatus::Healthy,
239            consecutive_failures: 0,
240        };
241        self.active_bridges
242            .write()
243            .await
244            .insert(id.to_string(), status);
245    }
246
247    async fn update_active(&self, id: &str, bridge_id: Option<String>) {
248        let mut active = self.active_bridges.write().await;
249        if let Some(status) = active.get_mut(id) {
250            status.last_active = Utc::now();
251            status.health = HealthStatus::Healthy;
252            status.consecutive_failures = 0;
253            if bridge_id.is_some() {
254                status.bridge_id = bridge_id;
255            }
256        }
257    }
258
259    async fn remove_connection(&self, id: &str) {
260        self.active_bridges.write().await.remove(id);
261    }
262
263    /// Register a new bridge secret.
264    /// Encrypts and saves to disk, and updates cache.
265    pub async fn register_bridge(&self, bridge_id: &str, secret: &[u8]) -> Result<()> {
266        validate_bridge_id(bridge_id)?;
267
268        let paths = Paths::resolve()?;
269        let bridges_dir = paths.data_dir.join("bridges");
270        std::fs::create_dir_all(&bridges_dir)?;
271
272        // 1. Get Master Key
273        let master_key = read_device_key(&paths.data_dir)?;
274
275        // 2. Derive Bridge Key = HMAC-SHA256(MasterKey, "bridge-key:" + bridge_id)
276        let bridge_key = derive_bridge_key(&master_key, bridge_id)?;
277
278        // 3. Encrypt Secret
279        let cipher = ChaCha20Poly1305::new(&bridge_key);
280
281        // Generate nonce manually to avoid rand_core version mismatch
282        let mut nonce_bytes = [0u8; 12];
283        rand::rng().fill(&mut nonce_bytes);
284        let nonce = Nonce::from_slice(&nonce_bytes);
285
286        let ciphertext = cipher
287            .encrypt(nonce, secret)
288            .map_err(|e| anyhow::anyhow!("Encryption failed: {}", e))?;
289
290        // 4. Save to file: [Nonce (12 bytes)][Ciphertext]
291        let mut file_content = nonce_bytes.to_vec();
292        file_content.extend(ciphertext);
293
294        let file_path = bridges_dir.join(format!("{}.enc", bridge_id));
295        std::fs::write(&file_path, file_content)?;
296
297        #[cfg(unix)]
298        {
299            use std::os::unix::fs::PermissionsExt;
300            std::fs::set_permissions(&file_path, std::fs::Permissions::from_mode(0o600))?;
301        }
302
303        // 5. Update Cache
304        let mut creds = self.credentials.write().await;
305        creds.insert(bridge_id.to_string(), secret.to_vec());
306
307        info!("Registered credentials for bridge: {}", bridge_id);
308        Ok(())
309    }
310
311    /// Retrieve credentials if the identity is authorized.
312    /// Loads from disk if not in cache.
313    pub async fn get_credentials_for(
314        &self,
315        bridge_id: &str,
316        identity: &PeerIdentity,
317    ) -> Result<Vec<u8>, BridgeError> {
318        if let Err(e) = validate_bridge_id(bridge_id) {
319            error!("Invalid bridge ID: {}", e);
320            return Err(BridgeError::AuthFailed("Invalid bridge ID".to_string()));
321        }
322
323        // Verify identity (Basic check for now)
324        // TODO: Implement stricter checks based on OS user or code signature
325        info!(
326            "Checking access for bridge: {} from {:?}",
327            bridge_id, identity
328        );
329
330        // Check cache first
331        {
332            let creds = self.credentials.read().await;
333            if let Some(secret) = creds.get(bridge_id) {
334                return Ok(secret.clone());
335            }
336        }
337
338        // Load from disk
339        match self.load_credentials_from_disk(bridge_id).await {
340            Ok(secret) => {
341                // Cache it
342                let mut creds = self.credentials.write().await;
343                creds.insert(bridge_id.to_string(), secret.clone());
344                Ok(secret)
345            }
346            Err(e) => {
347                error!("Failed to load credentials for {}: {}", bridge_id, e);
348                Err(BridgeError::NotRegistered)
349            }
350        }
351    }
352
353    async fn load_credentials_from_disk(&self, bridge_id: &str) -> Result<Vec<u8>> {
354        let paths = Paths::resolve()?;
355        let file_path = paths
356            .data_dir
357            .join("bridges")
358            .join(format!("{}.enc", bridge_id));
359
360        if !file_path.exists() {
361            anyhow::bail!("Credential file not found");
362        }
363
364        let file_content = std::fs::read(&file_path)?;
365        if file_content.len() < 12 {
366            anyhow::bail!("Invalid credential file format (too short)");
367        }
368
369        let (nonce_bytes, ciphertext) = file_content.split_at(12);
370        let nonce = Nonce::from_slice(nonce_bytes);
371
372        // Derive Key
373        let master_key = read_device_key(&paths.data_dir)?;
374        let bridge_key = derive_bridge_key(&master_key, bridge_id)?;
375
376        // Decrypt
377        let cipher = ChaCha20Poly1305::new(&bridge_key);
378        let plaintext = cipher
379            .decrypt(nonce, ciphertext)
380            .map_err(|e| anyhow::anyhow!("Decryption failed: {}", e))?;
381
382        Ok(plaintext)
383    }
384
385    /// Start the bridge server listening on the given socket path.
386    pub async fn serve(self, socket_path: &str) -> anyhow::Result<()> {
387        let listener = BridgeServer::bind(socket_path)?;
388        let manager = self.clone();
389
390        info!("BridgeManager listening on {}", socket_path);
391
392        loop {
393            let conn = match listener.accept().await {
394                Ok(c) => c,
395                Err(e) => {
396                    error!("Accept failed: {}", e);
397                    continue;
398                }
399            };
400
401            // Verify peer identity
402            let identity_result = {
403                #[cfg(unix)]
404                {
405                    get_peer_identity(&conn)
406                }
407                #[cfg(windows)]
408                {
409                    get_peer_identity(&conn)
410                }
411            };
412
413            let identity = match identity_result {
414                Ok(id) => {
415                    // Enforce UID matching (same-user only)
416                    #[cfg(unix)]
417                    {
418                        let current_uid = unsafe { libc::getuid() };
419                        if let Some(peer_uid) = id.uid.filter(|&uid| uid != current_uid) {
420                            error!(
421                                "Rejected connection from UID {} (expected {})",
422                                peer_uid, current_uid
423                            );
424                            continue;
425                        }
426                    }
427                    id
428                }
429                Err(e) => {
430                    error!("Peer identity verification failed: {}", e);
431                    continue;
432                }
433            };
434
435            info!("Accepted connection from: {:?}", identity);
436
437            let connection_id = Uuid::new_v4().to_string();
438            manager.add_connection(&connection_id, &identity).await;
439
440            let handler = ConnectionHandler {
441                manager: manager.clone(),
442                identity,
443                connection_id: connection_id.clone(),
444            };
445
446            let connection_manager = manager.clone();
447            tokio::spawn(async move {
448                if let Err(e) = localgpt_bridge::handle_connection(conn, handler).await {
449                    debug!("Connection handling finished/error: {:?}", e);
450                }
451                connection_manager.remove_connection(&connection_id).await;
452            });
453        }
454    }
455}
456
457impl Default for BridgeManager {
458    fn default() -> Self {
459        Self::new()
460    }
461}
462
463fn derive_bridge_key(master_key: &[u8; 32], bridge_id: &str) -> Result<Key> {
464    type HmacSha256 = Hmac<Sha256>;
465    // Disambiguate Mac vs KeyInit
466    let mut mac = <HmacSha256 as Mac>::new_from_slice(master_key)
467        .map_err(|e| anyhow::anyhow!("HMAC init failed: {}", e))?;
468
469    mac.update(b"bridge-key:");
470    mac.update(bridge_id.as_bytes());
471
472    let result = mac.finalize().into_bytes();
473    // ChaCha20Poly1305 key is 32 bytes, which matches SHA256 output size.
474    Ok(*Key::from_slice(&result))
475}
476
477/// Per-connection handler that implements the BridgeService trait.
478#[derive(Clone)]
479struct ConnectionHandler {
480    manager: BridgeManager,
481    identity: PeerIdentity,
482    connection_id: String,
483}
484
485impl BridgeService for ConnectionHandler {
486    async fn get_version(self, _: context::Context) -> String {
487        self.manager.update_active(&self.connection_id, None).await;
488        localgpt_bridge::BRIDGE_PROTOCOL_VERSION.to_string()
489    }
490
491    async fn ping(self, _: context::Context) -> bool {
492        self.manager.update_active(&self.connection_id, None).await;
493        true
494    }
495
496    async fn get_credentials(
497        self,
498        _: context::Context,
499        bridge_id: String,
500    ) -> Result<Vec<u8>, BridgeError> {
501        self.manager
502            .update_active(&self.connection_id, Some(bridge_id.clone()))
503            .await;
504        self.manager
505            .get_credentials_for(&bridge_id, &self.identity)
506            .await
507    }
508
509    async fn chat(
510        self,
511        _: context::Context,
512        session_id: String,
513        message: String,
514    ) -> Result<String, BridgeError> {
515        self.manager.update_active(&self.connection_id, None).await;
516        let support = self
517            .manager
518            .agent_support
519            .as_ref()
520            .ok_or_else(|| BridgeError::NotSupported("Agent support not available".into()))?;
521
522        let mut sessions = support.sessions.lock().await;
523
524        // Create session if it doesn't exist, using entry API to avoid unwrap
525        if let std::collections::hash_map::Entry::Vacant(entry) = sessions.entry(session_id.clone())
526        {
527            let agent_config = AgentConfig {
528                model: support.config.agent.default_model.clone(),
529                context_window: support.config.agent.context_window,
530                reserve_tokens: support.config.agent.reserve_tokens,
531            };
532            let mut agent = Agent::new(agent_config, &support.config, Arc::clone(&support.memory))
533                .await
534                .map_err(|e| BridgeError::Internal(format!("Failed to create agent: {}", e)))?;
535            agent
536                .new_session()
537                .await
538                .map_err(|e| BridgeError::Internal(format!("Failed to init session: {}", e)))?;
539            entry.insert(AgentSession { agent });
540        }
541
542        let session = sessions
543            .get_mut(&session_id)
544            .ok_or_else(|| BridgeError::Internal("Session unexpectedly missing".into()))?;
545        let response = session
546            .agent
547            .chat(&message)
548            .await
549            .map_err(|e| BridgeError::Internal(format!("Chat error: {}", e)))?;
550
551        if let Err(e) = session
552            .agent
553            .save_session_for_agent(BRIDGE_CLI_AGENT_ID)
554            .await
555        {
556            warn!("Failed to save bridge-cli session: {}", e);
557        }
558
559        Ok(response)
560    }
561
562    async fn new_session(
563        self,
564        _: context::Context,
565        session_id: String,
566    ) -> Result<String, BridgeError> {
567        self.manager.update_active(&self.connection_id, None).await;
568        let support = self
569            .manager
570            .agent_support
571            .as_ref()
572            .ok_or_else(|| BridgeError::NotSupported("Agent support not available".into()))?;
573
574        let mut sessions = support.sessions.lock().await;
575
576        let agent_config = AgentConfig {
577            model: support.config.agent.default_model.clone(),
578            context_window: support.config.agent.context_window,
579            reserve_tokens: support.config.agent.reserve_tokens,
580        };
581        let mut agent = Agent::new(agent_config, &support.config, Arc::clone(&support.memory))
582            .await
583            .map_err(|e| BridgeError::Internal(format!("Failed to create agent: {}", e)))?;
584        agent
585            .new_session()
586            .await
587            .map_err(|e| BridgeError::Internal(format!("Failed to init session: {}", e)))?;
588
589        let model = agent.model().to_string();
590        let chunks = agent.memory_chunk_count();
591        sessions.insert(session_id, AgentSession { agent });
592
593        Ok(format!(
594            "New session created. Model: {} | Memory: {} chunks",
595            model, chunks
596        ))
597    }
598
599    async fn session_status(
600        self,
601        _: context::Context,
602        session_id: String,
603    ) -> Result<String, BridgeError> {
604        self.manager.update_active(&self.connection_id, None).await;
605        let support = self
606            .manager
607            .agent_support
608            .as_ref()
609            .ok_or_else(|| BridgeError::NotSupported("Agent support not available".into()))?;
610
611        let sessions = support.sessions.lock().await;
612        let session = sessions
613            .get(&session_id)
614            .ok_or_else(|| BridgeError::Internal("No active session".into()))?;
615
616        let status = session.agent.session_status();
617        let mut output = String::new();
618        output.push_str(&format!("Session ID: {}\n", status.id));
619        output.push_str(&format!("Model: {}\n", session.agent.model()));
620        output.push_str(&format!("Messages: {}\n", status.message_count));
621        output.push_str(&format!("Context tokens: ~{}\n", status.token_count));
622        output.push_str(&format!("Compactions: {}\n", status.compaction_count));
623        output.push_str(&format!(
624            "Memory chunks: {}",
625            session.agent.memory_chunk_count()
626        ));
627
628        if status.api_input_tokens > 0 || status.api_output_tokens > 0 {
629            output.push_str(&format!(
630                "\nAPI tokens: {} in / {} out",
631                status.api_input_tokens, status.api_output_tokens
632            ));
633        }
634
635        Ok(output)
636    }
637
638    async fn set_model(
639        self,
640        _: context::Context,
641        session_id: String,
642        model: String,
643    ) -> Result<String, BridgeError> {
644        self.manager.update_active(&self.connection_id, None).await;
645        let support = self
646            .manager
647            .agent_support
648            .as_ref()
649            .ok_or_else(|| BridgeError::NotSupported("Agent support not available".into()))?;
650
651        let mut sessions = support.sessions.lock().await;
652        let session = sessions
653            .get_mut(&session_id)
654            .ok_or_else(|| BridgeError::Internal("No active session".into()))?;
655
656        session
657            .agent
658            .set_model(&model)
659            .map_err(|e| BridgeError::Internal(format!("Failed to set model: {}", e)))?;
660
661        Ok(format!("Switched to model: {}", model))
662    }
663
664    async fn compact_session(
665        self,
666        _: context::Context,
667        session_id: String,
668    ) -> Result<String, BridgeError> {
669        self.manager.update_active(&self.connection_id, None).await;
670        let support = self
671            .manager
672            .agent_support
673            .as_ref()
674            .ok_or_else(|| BridgeError::NotSupported("Agent support not available".into()))?;
675
676        let mut sessions = support.sessions.lock().await;
677        let session = sessions
678            .get_mut(&session_id)
679            .ok_or_else(|| BridgeError::Internal("No active session".into()))?;
680
681        let (before, after) = session
682            .agent
683            .compact_session()
684            .await
685            .map_err(|e| BridgeError::Internal(format!("Failed to compact: {}", e)))?;
686
687        Ok(format!(
688            "Session compacted. Token count: {} → {}",
689            before, after
690        ))
691    }
692
693    async fn clear_session(
694        self,
695        _: context::Context,
696        session_id: String,
697    ) -> Result<String, BridgeError> {
698        self.manager.update_active(&self.connection_id, None).await;
699        let support = self
700            .manager
701            .agent_support
702            .as_ref()
703            .ok_or_else(|| BridgeError::NotSupported("Agent support not available".into()))?;
704
705        let mut sessions = support.sessions.lock().await;
706        let session = sessions
707            .get_mut(&session_id)
708            .ok_or_else(|| BridgeError::Internal("No active session".into()))?;
709
710        session.agent.clear_session();
711        Ok("Session cleared.".into())
712    }
713
714    async fn memory_search(
715        self,
716        _: context::Context,
717        query: String,
718        limit: u32,
719    ) -> Result<String, BridgeError> {
720        self.manager.update_active(&self.connection_id, None).await;
721        let support = self
722            .manager
723            .agent_support
724            .as_ref()
725            .ok_or_else(|| BridgeError::NotSupported("Agent support not available".into()))?;
726
727        let results = support
728            .memory
729            .search(&query, limit as usize)
730            .map_err(|e| BridgeError::Internal(format!("Memory search failed: {}", e)))?;
731
732        if results.is_empty() {
733            return Ok(format!("No results found for '{}'", query));
734        }
735
736        let mut output = format!("Found {} results for '{}':\n", results.len(), query);
737        for (i, result) in results.iter().enumerate() {
738            output.push_str(&format!(
739                "\n{}. {} (lines {}-{})\n",
740                i + 1,
741                result.file,
742                result.line_start,
743                result.line_end
744            ));
745            output.push_str(&format!("   Score: {:.3}\n", result.score));
746            let preview: String = result.content.chars().take(200).collect();
747            let preview = preview.replace('\n', " ");
748            output.push_str(&format!(
749                "   {}{}\n",
750                preview,
751                if result.content.len() > 200 {
752                    "..."
753                } else {
754                    ""
755                }
756            ));
757        }
758
759        Ok(output)
760    }
761
762    async fn memory_stats(self, _: context::Context) -> Result<String, BridgeError> {
763        self.manager.update_active(&self.connection_id, None).await;
764        let support = self
765            .manager
766            .agent_support
767            .as_ref()
768            .ok_or_else(|| BridgeError::NotSupported("Agent support not available".into()))?;
769
770        let stats = support
771            .memory
772            .stats()
773            .map_err(|e| BridgeError::Internal(format!("Failed to get stats: {}", e)))?;
774
775        let mut output = String::new();
776        output.push_str("Memory Statistics\n");
777        output.push_str("-----------------\n");
778        output.push_str(&format!("Workspace: {}\n", stats.workspace));
779        output.push_str(&format!("Total files: {}\n", stats.total_files));
780        output.push_str(&format!("Total chunks: {}\n", stats.total_chunks));
781        output.push_str(&format!("Index size: {} KB\n", stats.index_size_kb));
782        output.push_str("\nFiles:\n");
783        for file in &stats.files {
784            output.push_str(&format!(
785                "  {} ({} chunks, {} lines)\n",
786                file.name, file.chunks, file.lines
787            ));
788        }
789
790        Ok(output)
791    }
792}
793
794fn validate_bridge_id(id: &str) -> Result<()> {
795    if id.is_empty() {
796        anyhow::bail!("Bridge ID cannot be empty");
797    }
798    if id.len() > 64 {
799        anyhow::bail!("Bridge ID is too long (max 64 chars)");
800    }
801    if !id
802        .chars()
803        .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
804    {
805        anyhow::bail!("Bridge ID contains invalid characters. Allowed: a-z, A-Z, 0-9, -, _");
806    }
807    Ok(())
808}
809
810#[cfg(test)]
811mod tests {
812    use super::*;
813
814    #[test]
815    fn test_health_status_serialization() {
816        let healthy = HealthStatus::Healthy;
817        assert_eq!(serde_json::to_string(&healthy).unwrap(), "\"healthy\"");
818
819        let degraded = HealthStatus::Degraded;
820        assert_eq!(serde_json::to_string(&degraded).unwrap(), "\"degraded\"");
821
822        let unhealthy = HealthStatus::Unhealthy;
823        assert_eq!(serde_json::to_string(&unhealthy).unwrap(), "\"unhealthy\"");
824    }
825
826    #[test]
827    fn test_health_check_config_default() {
828        let config = HealthCheckConfig::default();
829        assert_eq!(config.check_interval, Duration::from_secs(30));
830        assert_eq!(config.degraded_threshold, Duration::from_secs(60));
831        assert_eq!(config.unhealthy_threshold, Duration::from_secs(120));
832        assert!(config.log_warnings);
833    }
834
835    #[tokio::test]
836    async fn test_bridge_status_initial_health() {
837        let manager = BridgeManager::new();
838        let identity = PeerIdentity {
839            pid: Some(1234),
840            uid: Some(1000),
841            gid: Some(1000),
842        };
843
844        manager.add_connection("test-conn", &identity).await;
845
846        let bridges = manager.get_active_bridges().await;
847        assert_eq!(bridges.len(), 1);
848        assert_eq!(bridges[0].health, HealthStatus::Healthy);
849        assert_eq!(bridges[0].consecutive_failures, 0);
850    }
851
852    #[tokio::test]
853    async fn test_update_active_resets_health() {
854        let manager = BridgeManager::new();
855        let identity = PeerIdentity {
856            pid: Some(1234),
857            uid: Some(1000),
858            gid: Some(1000),
859        };
860
861        manager.add_connection("test-conn", &identity).await;
862
863        // Simulate bridge going unhealthy
864        {
865            let mut bridges = manager.active_bridges.write().await;
866            let status = bridges.get_mut("test-conn").unwrap();
867            status.health = HealthStatus::Unhealthy;
868            status.consecutive_failures = 5;
869        }
870
871        // Update active should reset health
872        manager
873            .update_active("test-conn", Some("telegram".to_string()))
874            .await;
875
876        let bridges = manager.get_active_bridges().await;
877        assert_eq!(bridges[0].health, HealthStatus::Healthy);
878        assert_eq!(bridges[0].consecutive_failures, 0);
879        assert_eq!(bridges[0].bridge_id, Some("telegram".to_string()));
880    }
881
882    #[tokio::test]
883    async fn test_health_check_degraded() {
884        let config = HealthCheckConfig {
885            check_interval: Duration::from_secs(30),
886            degraded_threshold: Duration::from_secs(5),
887            unhealthy_threshold: Duration::from_secs(10),
888            log_warnings: false,
889        };
890        let manager = BridgeManager::with_health_config(config);
891        let identity = PeerIdentity {
892            pid: Some(1234),
893            uid: Some(1000),
894            gid: Some(1000),
895        };
896
897        manager.add_connection("test-conn", &identity).await;
898
899        // Simulate time passing by setting last_active to the past
900        {
901            let mut bridges = manager.active_bridges.write().await;
902            let status = bridges.get_mut("test-conn").unwrap();
903            // Set last_active to 7 seconds ago (past degraded threshold of 5s)
904            status.last_active = Utc::now() - chrono::Duration::seconds(7);
905        }
906
907        // Run health check
908        manager.check_bridge_health().await;
909
910        let bridges = manager.get_active_bridges().await;
911        assert_eq!(bridges[0].health, HealthStatus::Degraded);
912        assert_eq!(bridges[0].consecutive_failures, 1);
913    }
914
915    #[tokio::test]
916    async fn test_health_check_unhealthy() {
917        let config = HealthCheckConfig {
918            check_interval: Duration::from_secs(30),
919            degraded_threshold: Duration::from_secs(5),
920            unhealthy_threshold: Duration::from_secs(10),
921            log_warnings: false,
922        };
923        let manager = BridgeManager::with_health_config(config);
924        let identity = PeerIdentity {
925            pid: Some(1234),
926            uid: Some(1000),
927            gid: Some(1000),
928        };
929
930        manager.add_connection("test-conn", &identity).await;
931
932        // Simulate time passing by setting last_active to the past
933        {
934            let mut bridges = manager.active_bridges.write().await;
935            let status = bridges.get_mut("test-conn").unwrap();
936            // Set last_active to 15 seconds ago (past unhealthy threshold of 10s)
937            status.last_active = Utc::now() - chrono::Duration::seconds(15);
938        }
939
940        // Run health check
941        manager.check_bridge_health().await;
942
943        let bridges = manager.get_active_bridges().await;
944        assert_eq!(bridges[0].health, HealthStatus::Unhealthy);
945        assert_eq!(bridges[0].consecutive_failures, 1);
946    }
947
948    #[tokio::test]
949    async fn test_health_check_consecutive_failures() {
950        let config = HealthCheckConfig {
951            check_interval: Duration::from_secs(30),
952            degraded_threshold: Duration::from_secs(5),
953            unhealthy_threshold: Duration::from_secs(10),
954            log_warnings: false,
955        };
956        let manager = BridgeManager::with_health_config(config);
957        let identity = PeerIdentity {
958            pid: Some(1234),
959            uid: Some(1000),
960            gid: Some(1000),
961        };
962
963        manager.add_connection("test-conn", &identity).await;
964
965        // Simulate bridge that stays unhealthy
966        {
967            let mut bridges = manager.active_bridges.write().await;
968            let status = bridges.get_mut("test-conn").unwrap();
969            status.last_active = Utc::now() - chrono::Duration::seconds(15);
970        }
971
972        // Run health check 3 times
973        manager.check_bridge_health().await;
974        manager.check_bridge_health().await;
975        manager.check_bridge_health().await;
976
977        let bridges = manager.get_active_bridges().await;
978        assert_eq!(bridges[0].consecutive_failures, 3);
979    }
980
981    #[tokio::test]
982    async fn test_health_check_healthy_resets_failures() {
983        let config = HealthCheckConfig {
984            check_interval: Duration::from_secs(30),
985            degraded_threshold: Duration::from_secs(5),
986            unhealthy_threshold: Duration::from_secs(10),
987            log_warnings: false,
988        };
989        let manager = BridgeManager::with_health_config(config);
990        let identity = PeerIdentity {
991            pid: Some(1234),
992            uid: Some(1000),
993            gid: Some(1000),
994        };
995
996        manager.add_connection("test-conn", &identity).await;
997
998        // Start with some failures
999        {
1000            let mut bridges = manager.active_bridges.write().await;
1001            let status = bridges.get_mut("test-conn").unwrap();
1002            status.consecutive_failures = 5;
1003            status.health = HealthStatus::Unhealthy;
1004        }
1005
1006        // Bridge becomes active again (last_active is now)
1007        // Run health check - should reset to healthy
1008        manager.check_bridge_health().await;
1009
1010        let bridges = manager.get_active_bridges().await;
1011        assert_eq!(bridges[0].health, HealthStatus::Healthy);
1012        assert_eq!(bridges[0].consecutive_failures, 0);
1013    }
1014
1015    #[tokio::test]
1016    async fn test_remove_connection() {
1017        let manager = BridgeManager::new();
1018        let identity = PeerIdentity {
1019            pid: Some(1234),
1020            uid: Some(1000),
1021            gid: Some(1000),
1022        };
1023
1024        manager.add_connection("test-conn", &identity).await;
1025        assert_eq!(manager.get_active_bridges().await.len(), 1);
1026
1027        manager.remove_connection("test-conn").await;
1028        assert_eq!(manager.get_active_bridges().await.len(), 0);
1029    }
1030
1031    #[test]
1032    fn test_validate_bridge_id() {
1033        assert!(validate_bridge_id("telegram").is_ok());
1034        assert!(validate_bridge_id("discord-bot").is_ok());
1035        assert!(validate_bridge_id("whatsapp_2").is_ok());
1036        assert!(validate_bridge_id("bridge123").is_ok());
1037
1038        assert!(validate_bridge_id("").is_err());
1039        assert!(validate_bridge_id(&"x".repeat(65)).is_err());
1040        assert!(validate_bridge_id("bridge!@#").is_err());
1041        assert!(validate_bridge_id("bridge name").is_err());
1042    }
1043}