1use anyhow::Result;
2use chacha20poly1305::{
3 ChaCha20Poly1305, Key, Nonce,
4 aead::{Aead, KeyInit},
5};
6use chrono::{DateTime, Utc};
7use hmac::{Hmac, Mac};
8use localgpt_bridge::peer_identity::{PeerIdentity, get_peer_identity};
9use localgpt_bridge::{BridgeError, BridgeServer, BridgeService};
10use rand::RngExt;
11use serde::Serialize;
12use sha2::Sha256;
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::Duration;
16use tarpc::context;
17use tokio::sync::RwLock;
18use tracing::{debug, error, info, warn};
19use uuid::Uuid;
20
21use localgpt_core::agent::{Agent, AgentConfig};
22use localgpt_core::config::Config;
23use localgpt_core::memory::MemoryManager;
24use localgpt_core::paths::Paths;
25use localgpt_core::security::read_device_key;
26
27const BRIDGE_CLI_AGENT_ID: &str = "bridge-cli";
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
32#[serde(rename_all = "snake_case")]
33pub enum HealthStatus {
34 Healthy,
36 Degraded,
38 Unhealthy,
40}
41
42#[derive(Debug, Clone, Serialize)]
44pub struct BridgeStatus {
45 pub connection_id: String,
46 pub bridge_id: Option<String>,
47 pub connected_at: DateTime<Utc>,
48 pub last_active: DateTime<Utc>,
49 pub pid: Option<i32>,
50 pub uid: Option<u32>,
51 pub health: HealthStatus,
53 pub consecutive_failures: u32,
55}
56
57#[derive(Debug, Clone)]
59pub struct HealthCheckConfig {
60 pub check_interval: Duration,
62 pub degraded_threshold: Duration,
64 pub unhealthy_threshold: Duration,
66 pub log_warnings: bool,
68}
69
70impl Default for HealthCheckConfig {
71 fn default() -> Self {
72 Self {
73 check_interval: Duration::from_secs(30),
74 degraded_threshold: Duration::from_secs(60),
75 unhealthy_threshold: Duration::from_secs(120),
76 log_warnings: true,
77 }
78 }
79}
80
81struct AgentSession {
83 agent: Agent,
84}
85
86struct AgentSupport {
88 config: Config,
89 memory: Arc<MemoryManager>,
90 sessions: tokio::sync::Mutex<HashMap<String, AgentSession>>,
91}
92
93#[derive(Clone)]
95pub struct BridgeManager {
96 credentials: Arc<RwLock<HashMap<String, Vec<u8>>>>,
98 active_bridges: Arc<RwLock<HashMap<String, BridgeStatus>>>,
100 agent_support: Option<Arc<AgentSupport>>,
102 health_config: HealthCheckConfig,
104}
105
106impl BridgeManager {
107 pub fn new() -> Self {
108 Self {
109 credentials: Arc::new(RwLock::new(HashMap::new())),
110 active_bridges: Arc::new(RwLock::new(HashMap::new())),
111 agent_support: None,
112 health_config: HealthCheckConfig::default(),
113 }
114 }
115
116 pub fn new_with_agent_support(config: Config, memory: MemoryManager) -> Self {
119 Self {
120 credentials: Arc::new(RwLock::new(HashMap::new())),
121 active_bridges: Arc::new(RwLock::new(HashMap::new())),
122 agent_support: Some(Arc::new(AgentSupport {
123 config,
124 memory: Arc::new(memory),
125 sessions: tokio::sync::Mutex::new(HashMap::new()),
126 })),
127 health_config: HealthCheckConfig::default(),
128 }
129 }
130
131 pub fn with_health_config(config: HealthCheckConfig) -> Self {
133 Self {
134 credentials: Arc::new(RwLock::new(HashMap::new())),
135 active_bridges: Arc::new(RwLock::new(HashMap::new())),
136 agent_support: None,
137 health_config: config,
138 }
139 }
140
141 pub fn start_health_checker(&self) -> tokio::task::JoinHandle<()> {
143 let manager = self.clone();
144 let interval = self.health_config.check_interval;
145
146 tokio::spawn(async move {
147 let mut timer = tokio::time::interval(interval);
148 loop {
149 timer.tick().await;
150 manager.check_bridge_health().await;
151 }
152 })
153 }
154
155 async fn check_bridge_health(&self) {
157 let now = Utc::now();
158 let config = &self.health_config;
159 let mut bridges = self.active_bridges.write().await;
160
161 for (_id, status) in bridges.iter_mut() {
162 let elapsed = (now - status.last_active)
163 .to_std()
164 .unwrap_or(Duration::ZERO);
165
166 let previous_health = status.health;
167 let previous_failures = status.consecutive_failures;
168
169 if elapsed > config.unhealthy_threshold {
171 status.health = HealthStatus::Unhealthy;
172 status.consecutive_failures += 1;
173 } else if elapsed > config.degraded_threshold {
174 status.health = HealthStatus::Degraded;
175 status.consecutive_failures += 1;
176 } else {
177 status.health = HealthStatus::Healthy;
178 status.consecutive_failures = 0;
179 }
180
181 if config.log_warnings {
183 if status.health != previous_health {
184 match status.health {
185 HealthStatus::Degraded => {
186 warn!(
187 "Bridge {} (connection {}) is degraded - no activity for {:?}",
188 status.bridge_id.as_deref().unwrap_or("unknown"),
189 status.connection_id,
190 elapsed
191 );
192 }
193 HealthStatus::Unhealthy => {
194 error!(
195 "Bridge {} (connection {}) is unhealthy - no activity for {:?}",
196 status.bridge_id.as_deref().unwrap_or("unknown"),
197 status.connection_id,
198 elapsed
199 );
200 }
201 HealthStatus::Healthy => {
202 info!(
203 "Bridge {} (connection {}) is now healthy",
204 status.bridge_id.as_deref().unwrap_or("unknown"),
205 status.connection_id
206 );
207 }
208 }
209 } else if status.health == HealthStatus::Unhealthy
210 && status.consecutive_failures > previous_failures
211 && status.consecutive_failures % 3 == 0
212 {
213 error!(
215 "Bridge {} (connection {}) still unhealthy (failures: {})",
216 status.bridge_id.as_deref().unwrap_or("unknown"),
217 status.connection_id,
218 status.consecutive_failures
219 );
220 }
221 }
222 }
223 }
224
225 pub async fn get_active_bridges(&self) -> Vec<BridgeStatus> {
227 self.active_bridges.read().await.values().cloned().collect()
228 }
229
230 async fn add_connection(&self, id: &str, identity: &PeerIdentity) {
231 let status = BridgeStatus {
232 connection_id: id.to_string(),
233 bridge_id: None,
234 connected_at: Utc::now(),
235 last_active: Utc::now(),
236 pid: identity.pid,
237 uid: identity.uid,
238 health: HealthStatus::Healthy,
239 consecutive_failures: 0,
240 };
241 self.active_bridges
242 .write()
243 .await
244 .insert(id.to_string(), status);
245 }
246
247 async fn update_active(&self, id: &str, bridge_id: Option<String>) {
248 let mut active = self.active_bridges.write().await;
249 if let Some(status) = active.get_mut(id) {
250 status.last_active = Utc::now();
251 status.health = HealthStatus::Healthy;
252 status.consecutive_failures = 0;
253 if bridge_id.is_some() {
254 status.bridge_id = bridge_id;
255 }
256 }
257 }
258
259 async fn remove_connection(&self, id: &str) {
260 self.active_bridges.write().await.remove(id);
261 }
262
263 pub async fn register_bridge(&self, bridge_id: &str, secret: &[u8]) -> Result<()> {
266 validate_bridge_id(bridge_id)?;
267
268 let paths = Paths::resolve()?;
269 let bridges_dir = paths.data_dir.join("bridges");
270 std::fs::create_dir_all(&bridges_dir)?;
271
272 let master_key = read_device_key(&paths.data_dir)?;
274
275 let bridge_key = derive_bridge_key(&master_key, bridge_id)?;
277
278 let cipher = ChaCha20Poly1305::new(&bridge_key);
280
281 let mut nonce_bytes = [0u8; 12];
283 rand::rng().fill(&mut nonce_bytes);
284 let nonce = Nonce::from_slice(&nonce_bytes);
285
286 let ciphertext = cipher
287 .encrypt(nonce, secret)
288 .map_err(|e| anyhow::anyhow!("Encryption failed: {}", e))?;
289
290 let mut file_content = nonce_bytes.to_vec();
292 file_content.extend(ciphertext);
293
294 let file_path = bridges_dir.join(format!("{}.enc", bridge_id));
295 std::fs::write(&file_path, file_content)?;
296
297 #[cfg(unix)]
298 {
299 use std::os::unix::fs::PermissionsExt;
300 std::fs::set_permissions(&file_path, std::fs::Permissions::from_mode(0o600))?;
301 }
302
303 let mut creds = self.credentials.write().await;
305 creds.insert(bridge_id.to_string(), secret.to_vec());
306
307 info!("Registered credentials for bridge: {}", bridge_id);
308 Ok(())
309 }
310
311 pub async fn get_credentials_for(
314 &self,
315 bridge_id: &str,
316 identity: &PeerIdentity,
317 ) -> Result<Vec<u8>, BridgeError> {
318 if let Err(e) = validate_bridge_id(bridge_id) {
319 error!("Invalid bridge ID: {}", e);
320 return Err(BridgeError::AuthFailed("Invalid bridge ID".to_string()));
321 }
322
323 info!(
326 "Checking access for bridge: {} from {:?}",
327 bridge_id, identity
328 );
329
330 {
332 let creds = self.credentials.read().await;
333 if let Some(secret) = creds.get(bridge_id) {
334 return Ok(secret.clone());
335 }
336 }
337
338 match self.load_credentials_from_disk(bridge_id).await {
340 Ok(secret) => {
341 let mut creds = self.credentials.write().await;
343 creds.insert(bridge_id.to_string(), secret.clone());
344 Ok(secret)
345 }
346 Err(e) => {
347 error!("Failed to load credentials for {}: {}", bridge_id, e);
348 Err(BridgeError::NotRegistered)
349 }
350 }
351 }
352
353 async fn load_credentials_from_disk(&self, bridge_id: &str) -> Result<Vec<u8>> {
354 let paths = Paths::resolve()?;
355 let file_path = paths
356 .data_dir
357 .join("bridges")
358 .join(format!("{}.enc", bridge_id));
359
360 if !file_path.exists() {
361 anyhow::bail!("Credential file not found");
362 }
363
364 let file_content = std::fs::read(&file_path)?;
365 if file_content.len() < 12 {
366 anyhow::bail!("Invalid credential file format (too short)");
367 }
368
369 let (nonce_bytes, ciphertext) = file_content.split_at(12);
370 let nonce = Nonce::from_slice(nonce_bytes);
371
372 let master_key = read_device_key(&paths.data_dir)?;
374 let bridge_key = derive_bridge_key(&master_key, bridge_id)?;
375
376 let cipher = ChaCha20Poly1305::new(&bridge_key);
378 let plaintext = cipher
379 .decrypt(nonce, ciphertext)
380 .map_err(|e| anyhow::anyhow!("Decryption failed: {}", e))?;
381
382 Ok(plaintext)
383 }
384
385 pub async fn serve(self, socket_path: &str) -> anyhow::Result<()> {
387 let listener = BridgeServer::bind(socket_path)?;
388 let manager = self.clone();
389
390 info!("BridgeManager listening on {}", socket_path);
391
392 loop {
393 let conn = match listener.accept().await {
394 Ok(c) => c,
395 Err(e) => {
396 error!("Accept failed: {}", e);
397 continue;
398 }
399 };
400
401 let identity_result = {
403 #[cfg(unix)]
404 {
405 get_peer_identity(&conn)
406 }
407 #[cfg(windows)]
408 {
409 get_peer_identity(&conn)
410 }
411 };
412
413 let identity = match identity_result {
414 Ok(id) => {
415 #[cfg(unix)]
417 {
418 let current_uid = unsafe { libc::getuid() };
419 if let Some(peer_uid) = id.uid.filter(|&uid| uid != current_uid) {
420 error!(
421 "Rejected connection from UID {} (expected {})",
422 peer_uid, current_uid
423 );
424 continue;
425 }
426 }
427 id
428 }
429 Err(e) => {
430 error!("Peer identity verification failed: {}", e);
431 continue;
432 }
433 };
434
435 info!("Accepted connection from: {:?}", identity);
436
437 let connection_id = Uuid::new_v4().to_string();
438 manager.add_connection(&connection_id, &identity).await;
439
440 let handler = ConnectionHandler {
441 manager: manager.clone(),
442 identity,
443 connection_id: connection_id.clone(),
444 };
445
446 let connection_manager = manager.clone();
447 tokio::spawn(async move {
448 if let Err(e) = localgpt_bridge::handle_connection(conn, handler).await {
449 debug!("Connection handling finished/error: {:?}", e);
450 }
451 connection_manager.remove_connection(&connection_id).await;
452 });
453 }
454 }
455}
456
457impl Default for BridgeManager {
458 fn default() -> Self {
459 Self::new()
460 }
461}
462
463fn derive_bridge_key(master_key: &[u8; 32], bridge_id: &str) -> Result<Key> {
464 type HmacSha256 = Hmac<Sha256>;
465 let mut mac = <HmacSha256 as Mac>::new_from_slice(master_key)
467 .map_err(|e| anyhow::anyhow!("HMAC init failed: {}", e))?;
468
469 mac.update(b"bridge-key:");
470 mac.update(bridge_id.as_bytes());
471
472 let result = mac.finalize().into_bytes();
473 Ok(*Key::from_slice(&result))
475}
476
477#[derive(Clone)]
479struct ConnectionHandler {
480 manager: BridgeManager,
481 identity: PeerIdentity,
482 connection_id: String,
483}
484
485impl BridgeService for ConnectionHandler {
486 async fn get_version(self, _: context::Context) -> String {
487 self.manager.update_active(&self.connection_id, None).await;
488 localgpt_bridge::BRIDGE_PROTOCOL_VERSION.to_string()
489 }
490
491 async fn ping(self, _: context::Context) -> bool {
492 self.manager.update_active(&self.connection_id, None).await;
493 true
494 }
495
496 async fn get_credentials(
497 self,
498 _: context::Context,
499 bridge_id: String,
500 ) -> Result<Vec<u8>, BridgeError> {
501 self.manager
502 .update_active(&self.connection_id, Some(bridge_id.clone()))
503 .await;
504 self.manager
505 .get_credentials_for(&bridge_id, &self.identity)
506 .await
507 }
508
509 async fn chat(
510 self,
511 _: context::Context,
512 session_id: String,
513 message: String,
514 ) -> Result<String, BridgeError> {
515 self.manager.update_active(&self.connection_id, None).await;
516 let support = self
517 .manager
518 .agent_support
519 .as_ref()
520 .ok_or_else(|| BridgeError::NotSupported("Agent support not available".into()))?;
521
522 let mut sessions = support.sessions.lock().await;
523
524 if let std::collections::hash_map::Entry::Vacant(entry) = sessions.entry(session_id.clone())
526 {
527 let agent_config = AgentConfig {
528 model: support.config.agent.default_model.clone(),
529 context_window: support.config.agent.context_window,
530 reserve_tokens: support.config.agent.reserve_tokens,
531 };
532 let mut agent = Agent::new(agent_config, &support.config, Arc::clone(&support.memory))
533 .await
534 .map_err(|e| BridgeError::Internal(format!("Failed to create agent: {}", e)))?;
535 agent
536 .new_session()
537 .await
538 .map_err(|e| BridgeError::Internal(format!("Failed to init session: {}", e)))?;
539 entry.insert(AgentSession { agent });
540 }
541
542 let session = sessions
543 .get_mut(&session_id)
544 .ok_or_else(|| BridgeError::Internal("Session unexpectedly missing".into()))?;
545 let response = session
546 .agent
547 .chat(&message)
548 .await
549 .map_err(|e| BridgeError::Internal(format!("Chat error: {}", e)))?;
550
551 if let Err(e) = session
552 .agent
553 .save_session_for_agent(BRIDGE_CLI_AGENT_ID)
554 .await
555 {
556 warn!("Failed to save bridge-cli session: {}", e);
557 }
558
559 Ok(response)
560 }
561
562 async fn new_session(
563 self,
564 _: context::Context,
565 session_id: String,
566 ) -> Result<String, BridgeError> {
567 self.manager.update_active(&self.connection_id, None).await;
568 let support = self
569 .manager
570 .agent_support
571 .as_ref()
572 .ok_or_else(|| BridgeError::NotSupported("Agent support not available".into()))?;
573
574 let mut sessions = support.sessions.lock().await;
575
576 let agent_config = AgentConfig {
577 model: support.config.agent.default_model.clone(),
578 context_window: support.config.agent.context_window,
579 reserve_tokens: support.config.agent.reserve_tokens,
580 };
581 let mut agent = Agent::new(agent_config, &support.config, Arc::clone(&support.memory))
582 .await
583 .map_err(|e| BridgeError::Internal(format!("Failed to create agent: {}", e)))?;
584 agent
585 .new_session()
586 .await
587 .map_err(|e| BridgeError::Internal(format!("Failed to init session: {}", e)))?;
588
589 let model = agent.model().to_string();
590 let chunks = agent.memory_chunk_count();
591 sessions.insert(session_id, AgentSession { agent });
592
593 Ok(format!(
594 "New session created. Model: {} | Memory: {} chunks",
595 model, chunks
596 ))
597 }
598
599 async fn session_status(
600 self,
601 _: context::Context,
602 session_id: String,
603 ) -> Result<String, BridgeError> {
604 self.manager.update_active(&self.connection_id, None).await;
605 let support = self
606 .manager
607 .agent_support
608 .as_ref()
609 .ok_or_else(|| BridgeError::NotSupported("Agent support not available".into()))?;
610
611 let sessions = support.sessions.lock().await;
612 let session = sessions
613 .get(&session_id)
614 .ok_or_else(|| BridgeError::Internal("No active session".into()))?;
615
616 let status = session.agent.session_status();
617 let mut output = String::new();
618 output.push_str(&format!("Session ID: {}\n", status.id));
619 output.push_str(&format!("Model: {}\n", session.agent.model()));
620 output.push_str(&format!("Messages: {}\n", status.message_count));
621 output.push_str(&format!("Context tokens: ~{}\n", status.token_count));
622 output.push_str(&format!("Compactions: {}\n", status.compaction_count));
623 output.push_str(&format!(
624 "Memory chunks: {}",
625 session.agent.memory_chunk_count()
626 ));
627
628 if status.api_input_tokens > 0 || status.api_output_tokens > 0 {
629 output.push_str(&format!(
630 "\nAPI tokens: {} in / {} out",
631 status.api_input_tokens, status.api_output_tokens
632 ));
633 }
634
635 Ok(output)
636 }
637
638 async fn set_model(
639 self,
640 _: context::Context,
641 session_id: String,
642 model: String,
643 ) -> Result<String, BridgeError> {
644 self.manager.update_active(&self.connection_id, None).await;
645 let support = self
646 .manager
647 .agent_support
648 .as_ref()
649 .ok_or_else(|| BridgeError::NotSupported("Agent support not available".into()))?;
650
651 let mut sessions = support.sessions.lock().await;
652 let session = sessions
653 .get_mut(&session_id)
654 .ok_or_else(|| BridgeError::Internal("No active session".into()))?;
655
656 session
657 .agent
658 .set_model(&model)
659 .map_err(|e| BridgeError::Internal(format!("Failed to set model: {}", e)))?;
660
661 Ok(format!("Switched to model: {}", model))
662 }
663
664 async fn compact_session(
665 self,
666 _: context::Context,
667 session_id: String,
668 ) -> Result<String, BridgeError> {
669 self.manager.update_active(&self.connection_id, None).await;
670 let support = self
671 .manager
672 .agent_support
673 .as_ref()
674 .ok_or_else(|| BridgeError::NotSupported("Agent support not available".into()))?;
675
676 let mut sessions = support.sessions.lock().await;
677 let session = sessions
678 .get_mut(&session_id)
679 .ok_or_else(|| BridgeError::Internal("No active session".into()))?;
680
681 let (before, after) = session
682 .agent
683 .compact_session()
684 .await
685 .map_err(|e| BridgeError::Internal(format!("Failed to compact: {}", e)))?;
686
687 Ok(format!(
688 "Session compacted. Token count: {} → {}",
689 before, after
690 ))
691 }
692
693 async fn clear_session(
694 self,
695 _: context::Context,
696 session_id: String,
697 ) -> Result<String, BridgeError> {
698 self.manager.update_active(&self.connection_id, None).await;
699 let support = self
700 .manager
701 .agent_support
702 .as_ref()
703 .ok_or_else(|| BridgeError::NotSupported("Agent support not available".into()))?;
704
705 let mut sessions = support.sessions.lock().await;
706 let session = sessions
707 .get_mut(&session_id)
708 .ok_or_else(|| BridgeError::Internal("No active session".into()))?;
709
710 session.agent.clear_session();
711 Ok("Session cleared.".into())
712 }
713
714 async fn memory_search(
715 self,
716 _: context::Context,
717 query: String,
718 limit: u32,
719 ) -> Result<String, BridgeError> {
720 self.manager.update_active(&self.connection_id, None).await;
721 let support = self
722 .manager
723 .agent_support
724 .as_ref()
725 .ok_or_else(|| BridgeError::NotSupported("Agent support not available".into()))?;
726
727 let results = support
728 .memory
729 .search(&query, limit as usize)
730 .map_err(|e| BridgeError::Internal(format!("Memory search failed: {}", e)))?;
731
732 if results.is_empty() {
733 return Ok(format!("No results found for '{}'", query));
734 }
735
736 let mut output = format!("Found {} results for '{}':\n", results.len(), query);
737 for (i, result) in results.iter().enumerate() {
738 output.push_str(&format!(
739 "\n{}. {} (lines {}-{})\n",
740 i + 1,
741 result.file,
742 result.line_start,
743 result.line_end
744 ));
745 output.push_str(&format!(" Score: {:.3}\n", result.score));
746 let preview: String = result.content.chars().take(200).collect();
747 let preview = preview.replace('\n', " ");
748 output.push_str(&format!(
749 " {}{}\n",
750 preview,
751 if result.content.len() > 200 {
752 "..."
753 } else {
754 ""
755 }
756 ));
757 }
758
759 Ok(output)
760 }
761
762 async fn memory_stats(self, _: context::Context) -> Result<String, BridgeError> {
763 self.manager.update_active(&self.connection_id, None).await;
764 let support = self
765 .manager
766 .agent_support
767 .as_ref()
768 .ok_or_else(|| BridgeError::NotSupported("Agent support not available".into()))?;
769
770 let stats = support
771 .memory
772 .stats()
773 .map_err(|e| BridgeError::Internal(format!("Failed to get stats: {}", e)))?;
774
775 let mut output = String::new();
776 output.push_str("Memory Statistics\n");
777 output.push_str("-----------------\n");
778 output.push_str(&format!("Workspace: {}\n", stats.workspace));
779 output.push_str(&format!("Total files: {}\n", stats.total_files));
780 output.push_str(&format!("Total chunks: {}\n", stats.total_chunks));
781 output.push_str(&format!("Index size: {} KB\n", stats.index_size_kb));
782 output.push_str("\nFiles:\n");
783 for file in &stats.files {
784 output.push_str(&format!(
785 " {} ({} chunks, {} lines)\n",
786 file.name, file.chunks, file.lines
787 ));
788 }
789
790 Ok(output)
791 }
792}
793
794fn validate_bridge_id(id: &str) -> Result<()> {
795 if id.is_empty() {
796 anyhow::bail!("Bridge ID cannot be empty");
797 }
798 if id.len() > 64 {
799 anyhow::bail!("Bridge ID is too long (max 64 chars)");
800 }
801 if !id
802 .chars()
803 .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
804 {
805 anyhow::bail!("Bridge ID contains invalid characters. Allowed: a-z, A-Z, 0-9, -, _");
806 }
807 Ok(())
808}
809
810#[cfg(test)]
811mod tests {
812 use super::*;
813
814 #[test]
815 fn test_health_status_serialization() {
816 let healthy = HealthStatus::Healthy;
817 assert_eq!(serde_json::to_string(&healthy).unwrap(), "\"healthy\"");
818
819 let degraded = HealthStatus::Degraded;
820 assert_eq!(serde_json::to_string(°raded).unwrap(), "\"degraded\"");
821
822 let unhealthy = HealthStatus::Unhealthy;
823 assert_eq!(serde_json::to_string(&unhealthy).unwrap(), "\"unhealthy\"");
824 }
825
826 #[test]
827 fn test_health_check_config_default() {
828 let config = HealthCheckConfig::default();
829 assert_eq!(config.check_interval, Duration::from_secs(30));
830 assert_eq!(config.degraded_threshold, Duration::from_secs(60));
831 assert_eq!(config.unhealthy_threshold, Duration::from_secs(120));
832 assert!(config.log_warnings);
833 }
834
835 #[tokio::test]
836 async fn test_bridge_status_initial_health() {
837 let manager = BridgeManager::new();
838 let identity = PeerIdentity {
839 pid: Some(1234),
840 uid: Some(1000),
841 gid: Some(1000),
842 };
843
844 manager.add_connection("test-conn", &identity).await;
845
846 let bridges = manager.get_active_bridges().await;
847 assert_eq!(bridges.len(), 1);
848 assert_eq!(bridges[0].health, HealthStatus::Healthy);
849 assert_eq!(bridges[0].consecutive_failures, 0);
850 }
851
852 #[tokio::test]
853 async fn test_update_active_resets_health() {
854 let manager = BridgeManager::new();
855 let identity = PeerIdentity {
856 pid: Some(1234),
857 uid: Some(1000),
858 gid: Some(1000),
859 };
860
861 manager.add_connection("test-conn", &identity).await;
862
863 {
865 let mut bridges = manager.active_bridges.write().await;
866 let status = bridges.get_mut("test-conn").unwrap();
867 status.health = HealthStatus::Unhealthy;
868 status.consecutive_failures = 5;
869 }
870
871 manager
873 .update_active("test-conn", Some("telegram".to_string()))
874 .await;
875
876 let bridges = manager.get_active_bridges().await;
877 assert_eq!(bridges[0].health, HealthStatus::Healthy);
878 assert_eq!(bridges[0].consecutive_failures, 0);
879 assert_eq!(bridges[0].bridge_id, Some("telegram".to_string()));
880 }
881
882 #[tokio::test]
883 async fn test_health_check_degraded() {
884 let config = HealthCheckConfig {
885 check_interval: Duration::from_secs(30),
886 degraded_threshold: Duration::from_secs(5),
887 unhealthy_threshold: Duration::from_secs(10),
888 log_warnings: false,
889 };
890 let manager = BridgeManager::with_health_config(config);
891 let identity = PeerIdentity {
892 pid: Some(1234),
893 uid: Some(1000),
894 gid: Some(1000),
895 };
896
897 manager.add_connection("test-conn", &identity).await;
898
899 {
901 let mut bridges = manager.active_bridges.write().await;
902 let status = bridges.get_mut("test-conn").unwrap();
903 status.last_active = Utc::now() - chrono::Duration::seconds(7);
905 }
906
907 manager.check_bridge_health().await;
909
910 let bridges = manager.get_active_bridges().await;
911 assert_eq!(bridges[0].health, HealthStatus::Degraded);
912 assert_eq!(bridges[0].consecutive_failures, 1);
913 }
914
915 #[tokio::test]
916 async fn test_health_check_unhealthy() {
917 let config = HealthCheckConfig {
918 check_interval: Duration::from_secs(30),
919 degraded_threshold: Duration::from_secs(5),
920 unhealthy_threshold: Duration::from_secs(10),
921 log_warnings: false,
922 };
923 let manager = BridgeManager::with_health_config(config);
924 let identity = PeerIdentity {
925 pid: Some(1234),
926 uid: Some(1000),
927 gid: Some(1000),
928 };
929
930 manager.add_connection("test-conn", &identity).await;
931
932 {
934 let mut bridges = manager.active_bridges.write().await;
935 let status = bridges.get_mut("test-conn").unwrap();
936 status.last_active = Utc::now() - chrono::Duration::seconds(15);
938 }
939
940 manager.check_bridge_health().await;
942
943 let bridges = manager.get_active_bridges().await;
944 assert_eq!(bridges[0].health, HealthStatus::Unhealthy);
945 assert_eq!(bridges[0].consecutive_failures, 1);
946 }
947
948 #[tokio::test]
949 async fn test_health_check_consecutive_failures() {
950 let config = HealthCheckConfig {
951 check_interval: Duration::from_secs(30),
952 degraded_threshold: Duration::from_secs(5),
953 unhealthy_threshold: Duration::from_secs(10),
954 log_warnings: false,
955 };
956 let manager = BridgeManager::with_health_config(config);
957 let identity = PeerIdentity {
958 pid: Some(1234),
959 uid: Some(1000),
960 gid: Some(1000),
961 };
962
963 manager.add_connection("test-conn", &identity).await;
964
965 {
967 let mut bridges = manager.active_bridges.write().await;
968 let status = bridges.get_mut("test-conn").unwrap();
969 status.last_active = Utc::now() - chrono::Duration::seconds(15);
970 }
971
972 manager.check_bridge_health().await;
974 manager.check_bridge_health().await;
975 manager.check_bridge_health().await;
976
977 let bridges = manager.get_active_bridges().await;
978 assert_eq!(bridges[0].consecutive_failures, 3);
979 }
980
981 #[tokio::test]
982 async fn test_health_check_healthy_resets_failures() {
983 let config = HealthCheckConfig {
984 check_interval: Duration::from_secs(30),
985 degraded_threshold: Duration::from_secs(5),
986 unhealthy_threshold: Duration::from_secs(10),
987 log_warnings: false,
988 };
989 let manager = BridgeManager::with_health_config(config);
990 let identity = PeerIdentity {
991 pid: Some(1234),
992 uid: Some(1000),
993 gid: Some(1000),
994 };
995
996 manager.add_connection("test-conn", &identity).await;
997
998 {
1000 let mut bridges = manager.active_bridges.write().await;
1001 let status = bridges.get_mut("test-conn").unwrap();
1002 status.consecutive_failures = 5;
1003 status.health = HealthStatus::Unhealthy;
1004 }
1005
1006 manager.check_bridge_health().await;
1009
1010 let bridges = manager.get_active_bridges().await;
1011 assert_eq!(bridges[0].health, HealthStatus::Healthy);
1012 assert_eq!(bridges[0].consecutive_failures, 0);
1013 }
1014
1015 #[tokio::test]
1016 async fn test_remove_connection() {
1017 let manager = BridgeManager::new();
1018 let identity = PeerIdentity {
1019 pid: Some(1234),
1020 uid: Some(1000),
1021 gid: Some(1000),
1022 };
1023
1024 manager.add_connection("test-conn", &identity).await;
1025 assert_eq!(manager.get_active_bridges().await.len(), 1);
1026
1027 manager.remove_connection("test-conn").await;
1028 assert_eq!(manager.get_active_bridges().await.len(), 0);
1029 }
1030
1031 #[test]
1032 fn test_validate_bridge_id() {
1033 assert!(validate_bridge_id("telegram").is_ok());
1034 assert!(validate_bridge_id("discord-bot").is_ok());
1035 assert!(validate_bridge_id("whatsapp_2").is_ok());
1036 assert!(validate_bridge_id("bridge123").is_ok());
1037
1038 assert!(validate_bridge_id("").is_err());
1039 assert!(validate_bridge_id(&"x".repeat(65)).is_err());
1040 assert!(validate_bridge_id("bridge!@#").is_err());
1041 assert!(validate_bridge_id("bridge name").is_err());
1042 }
1043}