1use anyhow::Result;
2use chacha20poly1305::{
3 ChaCha20Poly1305, Key, Nonce,
4 aead::{Aead, KeyInit},
5};
6use chrono::{DateTime, Utc};
7use hmac::{Hmac, Mac};
8use localgpt_bridge::peer_identity::{PeerIdentity, get_peer_identity};
9use localgpt_bridge::{BridgeError, BridgeServer, BridgeService};
10use rand::RngExt;
11use serde::Serialize;
12use sha2::Sha256;
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::Duration;
16use tarpc::context;
17use tokio::sync::RwLock;
18use tracing::{debug, error, info, warn};
19use uuid::Uuid;
20
21use localgpt_core::agent::{Agent, AgentConfig};
22use localgpt_core::config::Config;
23use localgpt_core::memory::MemoryManager;
24use localgpt_core::paths::Paths;
25use localgpt_core::security::read_device_key;
26
27const BRIDGE_CLI_AGENT_ID: &str = "bridge-cli";
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
32#[serde(rename_all = "snake_case")]
33pub enum HealthStatus {
34 Healthy,
36 Degraded,
38 Unhealthy,
40}
41
42#[derive(Debug, Clone, Serialize)]
44pub struct BridgeStatus {
45 pub connection_id: String,
46 pub bridge_id: Option<String>,
47 pub connected_at: DateTime<Utc>,
48 pub last_active: DateTime<Utc>,
49 pub pid: Option<i32>,
50 pub uid: Option<u32>,
51 pub health: HealthStatus,
53 pub consecutive_failures: u32,
55}
56
57#[derive(Debug, Clone)]
59pub struct HealthCheckConfig {
60 pub check_interval: Duration,
62 pub degraded_threshold: Duration,
64 pub unhealthy_threshold: Duration,
66 pub log_warnings: bool,
68}
69
70impl Default for HealthCheckConfig {
71 fn default() -> Self {
72 Self {
73 check_interval: Duration::from_secs(30),
74 degraded_threshold: Duration::from_secs(60),
75 unhealthy_threshold: Duration::from_secs(120),
76 log_warnings: true,
77 }
78 }
79}
80
81struct AgentSession {
83 agent: Agent,
84}
85
86struct AgentSupport {
88 config: Config,
89 memory: Arc<MemoryManager>,
90 sessions: tokio::sync::Mutex<HashMap<String, AgentSession>>,
91}
92
93#[derive(Clone)]
95pub struct BridgeManager {
96 credentials: Arc<RwLock<HashMap<String, Vec<u8>>>>,
98 active_bridges: Arc<RwLock<HashMap<String, BridgeStatus>>>,
100 agent_support: Option<Arc<AgentSupport>>,
102 health_config: HealthCheckConfig,
104}
105
106impl BridgeManager {
107 pub fn new() -> Self {
108 Self {
109 credentials: Arc::new(RwLock::new(HashMap::new())),
110 active_bridges: Arc::new(RwLock::new(HashMap::new())),
111 agent_support: None,
112 health_config: HealthCheckConfig::default(),
113 }
114 }
115
116 pub fn new_with_agent_support(config: Config, memory: MemoryManager) -> Self {
119 Self {
120 credentials: Arc::new(RwLock::new(HashMap::new())),
121 active_bridges: Arc::new(RwLock::new(HashMap::new())),
122 agent_support: Some(Arc::new(AgentSupport {
123 config,
124 memory: Arc::new(memory),
125 sessions: tokio::sync::Mutex::new(HashMap::new()),
126 })),
127 health_config: HealthCheckConfig::default(),
128 }
129 }
130
131 pub fn with_health_config(config: HealthCheckConfig) -> Self {
133 Self {
134 credentials: Arc::new(RwLock::new(HashMap::new())),
135 active_bridges: Arc::new(RwLock::new(HashMap::new())),
136 agent_support: None,
137 health_config: config,
138 }
139 }
140
141 pub fn start_health_checker(&self) -> tokio::task::JoinHandle<()> {
143 let manager = self.clone();
144 let interval = self.health_config.check_interval;
145
146 tokio::spawn(async move {
147 let mut timer = tokio::time::interval(interval);
148 loop {
149 timer.tick().await;
150 manager.check_bridge_health().await;
151 }
152 })
153 }
154
155 async fn check_bridge_health(&self) {
157 let now = Utc::now();
158 let config = &self.health_config;
159 let mut bridges = self.active_bridges.write().await;
160
161 for (_id, status) in bridges.iter_mut() {
162 let elapsed = (now - status.last_active)
163 .to_std()
164 .unwrap_or(Duration::ZERO);
165
166 let previous_health = status.health;
167 let previous_failures = status.consecutive_failures;
168
169 if elapsed > config.unhealthy_threshold {
171 status.health = HealthStatus::Unhealthy;
172 status.consecutive_failures += 1;
173 } else if elapsed > config.degraded_threshold {
174 status.health = HealthStatus::Degraded;
175 status.consecutive_failures += 1;
176 } else {
177 status.health = HealthStatus::Healthy;
178 status.consecutive_failures = 0;
179 }
180
181 if config.log_warnings {
183 if status.health != previous_health {
184 match status.health {
185 HealthStatus::Degraded => {
186 warn!(
187 "Bridge {} (connection {}) is degraded - no activity for {:?}",
188 status.bridge_id.as_deref().unwrap_or("unknown"),
189 status.connection_id,
190 elapsed
191 );
192 }
193 HealthStatus::Unhealthy => {
194 error!(
195 "Bridge {} (connection {}) is unhealthy - no activity for {:?}",
196 status.bridge_id.as_deref().unwrap_or("unknown"),
197 status.connection_id,
198 elapsed
199 );
200 }
201 HealthStatus::Healthy => {
202 info!(
203 "Bridge {} (connection {}) is now healthy",
204 status.bridge_id.as_deref().unwrap_or("unknown"),
205 status.connection_id
206 );
207 }
208 }
209 } else if status.health == HealthStatus::Unhealthy
210 && status.consecutive_failures > previous_failures
211 && status.consecutive_failures % 3 == 0
212 {
213 error!(
215 "Bridge {} (connection {}) still unhealthy (failures: {})",
216 status.bridge_id.as_deref().unwrap_or("unknown"),
217 status.connection_id,
218 status.consecutive_failures
219 );
220 }
221 }
222 }
223 }
224
225 pub async fn get_active_bridges(&self) -> Vec<BridgeStatus> {
227 self.active_bridges.read().await.values().cloned().collect()
228 }
229
230 async fn add_connection(&self, id: &str, identity: &PeerIdentity) {
231 let status = BridgeStatus {
232 connection_id: id.to_string(),
233 bridge_id: None,
234 connected_at: Utc::now(),
235 last_active: Utc::now(),
236 pid: identity.pid,
237 uid: identity.uid,
238 health: HealthStatus::Healthy,
239 consecutive_failures: 0,
240 };
241 self.active_bridges
242 .write()
243 .await
244 .insert(id.to_string(), status);
245 }
246
247 async fn update_active(&self, id: &str, bridge_id: Option<String>) {
248 let mut active = self.active_bridges.write().await;
249 if let Some(status) = active.get_mut(id) {
250 status.last_active = Utc::now();
251 status.health = HealthStatus::Healthy;
252 status.consecutive_failures = 0;
253 if bridge_id.is_some() {
254 status.bridge_id = bridge_id;
255 }
256 }
257 }
258
259 async fn remove_connection(&self, id: &str) {
260 self.active_bridges.write().await.remove(id);
261 }
262
263 pub async fn register_bridge(&self, bridge_id: &str, secret: &[u8]) -> Result<()> {
266 validate_bridge_id(bridge_id)?;
267
268 let paths = Paths::resolve()?;
269 let bridges_dir = paths.data_dir.join("bridges");
270 std::fs::create_dir_all(&bridges_dir)?;
271
272 let master_key = read_device_key(&paths.data_dir)?;
274
275 let bridge_key = derive_bridge_key(&master_key, bridge_id)?;
277
278 let cipher = ChaCha20Poly1305::new(&bridge_key);
280
281 let mut nonce_bytes = [0u8; 12];
283 rand::rng().fill(&mut nonce_bytes);
284 let nonce = Nonce::from_slice(&nonce_bytes);
285
286 let ciphertext = cipher
287 .encrypt(nonce, secret)
288 .map_err(|e| anyhow::anyhow!("Encryption failed: {}", e))?;
289
290 let mut file_content = nonce_bytes.to_vec();
292 file_content.extend(ciphertext);
293
294 let file_path = bridges_dir.join(format!("{}.enc", bridge_id));
295 std::fs::write(&file_path, file_content)?;
296
297 #[cfg(unix)]
298 {
299 use std::os::unix::fs::PermissionsExt;
300 std::fs::set_permissions(&file_path, std::fs::Permissions::from_mode(0o600))?;
301 }
302
303 let mut creds = self.credentials.write().await;
305 creds.insert(bridge_id.to_string(), secret.to_vec());
306
307 info!("Registered credentials for bridge: {}", bridge_id);
308 Ok(())
309 }
310
311 pub async fn get_credentials_for(
314 &self,
315 bridge_id: &str,
316 identity: &PeerIdentity,
317 ) -> Result<Vec<u8>, BridgeError> {
318 if let Err(e) = validate_bridge_id(bridge_id) {
319 error!("Invalid bridge ID: {}", e);
320 return Err(BridgeError::AuthFailed("Invalid bridge ID".to_string()));
321 }
322
323 info!(
326 "Checking access for bridge: {} from {:?}",
327 bridge_id, identity
328 );
329
330 {
332 let creds = self.credentials.read().await;
333 if let Some(secret) = creds.get(bridge_id) {
334 return Ok(secret.clone());
335 }
336 }
337
338 match self.load_credentials_from_disk(bridge_id).await {
340 Ok(secret) => {
341 let mut creds = self.credentials.write().await;
343 creds.insert(bridge_id.to_string(), secret.clone());
344 Ok(secret)
345 }
346 Err(e) => {
347 error!("Failed to load credentials for {}: {}", bridge_id, e);
348 Err(BridgeError::NotRegistered)
349 }
350 }
351 }
352
353 async fn load_credentials_from_disk(&self, bridge_id: &str) -> Result<Vec<u8>> {
354 let paths = Paths::resolve()?;
355 let file_path = paths
356 .data_dir
357 .join("bridges")
358 .join(format!("{}.enc", bridge_id));
359
360 if !file_path.exists() {
361 anyhow::bail!("Credential file not found");
362 }
363
364 let file_content = std::fs::read(&file_path)?;
365 if file_content.len() < 12 {
366 anyhow::bail!("Invalid credential file format (too short)");
367 }
368
369 let (nonce_bytes, ciphertext) = file_content.split_at(12);
370 let nonce = Nonce::from_slice(nonce_bytes);
371
372 let master_key = read_device_key(&paths.data_dir)?;
374 let bridge_key = derive_bridge_key(&master_key, bridge_id)?;
375
376 let cipher = ChaCha20Poly1305::new(&bridge_key);
378 let plaintext = cipher
379 .decrypt(nonce, ciphertext)
380 .map_err(|e| anyhow::anyhow!("Decryption failed: {}", e))?;
381
382 Ok(plaintext)
383 }
384
385 pub async fn serve(self, socket_path: &str) -> anyhow::Result<()> {
387 let listener = BridgeServer::bind(socket_path)?;
388 let manager = self.clone();
389
390 info!("BridgeManager listening on {}", socket_path);
391
392 loop {
393 let conn = match listener.accept().await {
394 Ok(c) => c,
395 Err(e) => {
396 error!("Accept failed: {}", e);
397 continue;
398 }
399 };
400
401 let identity_result = {
403 #[cfg(unix)]
404 {
405 get_peer_identity(&conn)
406 }
407 #[cfg(windows)]
408 {
409 get_peer_identity(&conn)
410 }
411 };
412
413 let identity = match identity_result {
414 Ok(id) => {
415 #[cfg(unix)]
417 {
418 let current_uid = unsafe { libc::getuid() };
421 if let Some(peer_uid) = id.uid.filter(|&uid| uid != current_uid) {
422 error!(
423 "Rejected connection from UID {} (expected {})",
424 peer_uid, current_uid
425 );
426 continue;
427 }
428 }
429 id
430 }
431 Err(e) => {
432 error!("Peer identity verification failed: {}", e);
433 continue;
434 }
435 };
436
437 info!("Accepted connection from: {:?}", identity);
438
439 let connection_id = Uuid::new_v4().to_string();
440 manager.add_connection(&connection_id, &identity).await;
441
442 let handler = ConnectionHandler {
443 manager: manager.clone(),
444 identity,
445 connection_id: connection_id.clone(),
446 };
447
448 let connection_manager = manager.clone();
449 tokio::spawn(async move {
450 if let Err(e) = localgpt_bridge::handle_connection(conn, handler).await {
451 debug!("Connection handling finished/error: {:?}", e);
452 }
453 connection_manager.remove_connection(&connection_id).await;
454 });
455 }
456 }
457}
458
459impl Default for BridgeManager {
460 fn default() -> Self {
461 Self::new()
462 }
463}
464
465fn derive_bridge_key(master_key: &[u8; 32], bridge_id: &str) -> Result<Key> {
466 type HmacSha256 = Hmac<Sha256>;
467 let mut mac = <HmacSha256 as Mac>::new_from_slice(master_key)
469 .map_err(|e| anyhow::anyhow!("HMAC init failed: {}", e))?;
470
471 mac.update(b"bridge-key:");
472 mac.update(bridge_id.as_bytes());
473
474 let result = mac.finalize().into_bytes();
475 Ok(*Key::from_slice(&result))
477}
478
479#[derive(Clone)]
481struct ConnectionHandler {
482 manager: BridgeManager,
483 identity: PeerIdentity,
484 connection_id: String,
485}
486
487impl BridgeService for ConnectionHandler {
488 async fn get_version(self, _: context::Context) -> String {
489 self.manager.update_active(&self.connection_id, None).await;
490 localgpt_bridge::BRIDGE_PROTOCOL_VERSION.to_string()
491 }
492
493 async fn ping(self, _: context::Context) -> bool {
494 self.manager.update_active(&self.connection_id, None).await;
495 true
496 }
497
498 async fn get_credentials(
499 self,
500 _: context::Context,
501 bridge_id: String,
502 ) -> Result<Vec<u8>, BridgeError> {
503 self.manager
504 .update_active(&self.connection_id, Some(bridge_id.clone()))
505 .await;
506 self.manager
507 .get_credentials_for(&bridge_id, &self.identity)
508 .await
509 }
510
511 async fn chat(
512 self,
513 _: context::Context,
514 session_id: String,
515 message: String,
516 ) -> Result<String, BridgeError> {
517 self.manager.update_active(&self.connection_id, None).await;
518 let support = self
519 .manager
520 .agent_support
521 .as_ref()
522 .ok_or_else(|| BridgeError::NotSupported("Agent support not available".into()))?;
523
524 let mut sessions = support.sessions.lock().await;
525
526 if let std::collections::hash_map::Entry::Vacant(entry) = sessions.entry(session_id.clone())
528 {
529 let agent_config = AgentConfig {
530 model: support.config.agent.default_model.clone(),
531 context_window: support.config.agent.context_window,
532 reserve_tokens: support.config.agent.reserve_tokens,
533 };
534 let mut agent = Agent::new(agent_config, &support.config, Arc::clone(&support.memory))
535 .await
536 .map_err(|e| BridgeError::Internal(format!("Failed to create agent: {}", e)))?;
537 agent
538 .new_session()
539 .await
540 .map_err(|e| BridgeError::Internal(format!("Failed to init session: {}", e)))?;
541 entry.insert(AgentSession { agent });
542 }
543
544 let session = sessions
545 .get_mut(&session_id)
546 .ok_or_else(|| BridgeError::Internal("Session unexpectedly missing".into()))?;
547 let response = session
548 .agent
549 .chat(&message)
550 .await
551 .map_err(|e| BridgeError::Internal(format!("Chat error: {}", e)))?;
552
553 if let Err(e) = session
554 .agent
555 .save_session_for_agent(BRIDGE_CLI_AGENT_ID)
556 .await
557 {
558 warn!("Failed to save bridge-cli session: {}", e);
559 }
560
561 Ok(response)
562 }
563
564 async fn new_session(
565 self,
566 _: context::Context,
567 session_id: String,
568 ) -> Result<String, BridgeError> {
569 self.manager.update_active(&self.connection_id, None).await;
570 let support = self
571 .manager
572 .agent_support
573 .as_ref()
574 .ok_or_else(|| BridgeError::NotSupported("Agent support not available".into()))?;
575
576 let mut sessions = support.sessions.lock().await;
577
578 let agent_config = AgentConfig {
579 model: support.config.agent.default_model.clone(),
580 context_window: support.config.agent.context_window,
581 reserve_tokens: support.config.agent.reserve_tokens,
582 };
583 let mut agent = Agent::new(agent_config, &support.config, Arc::clone(&support.memory))
584 .await
585 .map_err(|e| BridgeError::Internal(format!("Failed to create agent: {}", e)))?;
586 agent
587 .new_session()
588 .await
589 .map_err(|e| BridgeError::Internal(format!("Failed to init session: {}", e)))?;
590
591 let model = agent.model().to_string();
592 let chunks = agent.memory_chunk_count();
593 sessions.insert(session_id, AgentSession { agent });
594
595 Ok(format!(
596 "New session created. Model: {} | Memory: {} chunks",
597 model, chunks
598 ))
599 }
600
601 async fn session_status(
602 self,
603 _: context::Context,
604 session_id: String,
605 ) -> Result<String, BridgeError> {
606 self.manager.update_active(&self.connection_id, None).await;
607 let support = self
608 .manager
609 .agent_support
610 .as_ref()
611 .ok_or_else(|| BridgeError::NotSupported("Agent support not available".into()))?;
612
613 let sessions = support.sessions.lock().await;
614 let session = sessions
615 .get(&session_id)
616 .ok_or_else(|| BridgeError::Internal("No active session".into()))?;
617
618 let status = session.agent.session_status();
619 let mut output = String::new();
620 output.push_str(&format!("Session ID: {}\n", status.id));
621 output.push_str(&format!("Model: {}\n", session.agent.model()));
622 output.push_str(&format!("Messages: {}\n", status.message_count));
623 output.push_str(&format!("Context tokens: ~{}\n", status.token_count));
624 output.push_str(&format!("Compactions: {}\n", status.compaction_count));
625 output.push_str(&format!(
626 "Memory chunks: {}",
627 session.agent.memory_chunk_count()
628 ));
629
630 if status.api_input_tokens > 0 || status.api_output_tokens > 0 {
631 output.push_str(&format!(
632 "\nAPI tokens: {} in / {} out",
633 status.api_input_tokens, status.api_output_tokens
634 ));
635 }
636
637 Ok(output)
638 }
639
640 async fn set_model(
641 self,
642 _: context::Context,
643 session_id: String,
644 model: String,
645 ) -> Result<String, BridgeError> {
646 self.manager.update_active(&self.connection_id, None).await;
647 let support = self
648 .manager
649 .agent_support
650 .as_ref()
651 .ok_or_else(|| BridgeError::NotSupported("Agent support not available".into()))?;
652
653 let mut sessions = support.sessions.lock().await;
654 let session = sessions
655 .get_mut(&session_id)
656 .ok_or_else(|| BridgeError::Internal("No active session".into()))?;
657
658 session
659 .agent
660 .set_model(&model)
661 .map_err(|e| BridgeError::Internal(format!("Failed to set model: {}", e)))?;
662
663 Ok(format!("Switched to model: {}", model))
664 }
665
666 async fn compact_session(
667 self,
668 _: context::Context,
669 session_id: String,
670 ) -> Result<String, BridgeError> {
671 self.manager.update_active(&self.connection_id, None).await;
672 let support = self
673 .manager
674 .agent_support
675 .as_ref()
676 .ok_or_else(|| BridgeError::NotSupported("Agent support not available".into()))?;
677
678 let mut sessions = support.sessions.lock().await;
679 let session = sessions
680 .get_mut(&session_id)
681 .ok_or_else(|| BridgeError::Internal("No active session".into()))?;
682
683 let (before, after) = session
684 .agent
685 .compact_session()
686 .await
687 .map_err(|e| BridgeError::Internal(format!("Failed to compact: {}", e)))?;
688
689 Ok(format!(
690 "Session compacted. Token count: {} → {}",
691 before, after
692 ))
693 }
694
695 async fn clear_session(
696 self,
697 _: context::Context,
698 session_id: String,
699 ) -> Result<String, BridgeError> {
700 self.manager.update_active(&self.connection_id, None).await;
701 let support = self
702 .manager
703 .agent_support
704 .as_ref()
705 .ok_or_else(|| BridgeError::NotSupported("Agent support not available".into()))?;
706
707 let mut sessions = support.sessions.lock().await;
708 let session = sessions
709 .get_mut(&session_id)
710 .ok_or_else(|| BridgeError::Internal("No active session".into()))?;
711
712 session.agent.clear_session();
713 Ok("Session cleared.".into())
714 }
715
716 async fn memory_search(
717 self,
718 _: context::Context,
719 query: String,
720 limit: u32,
721 ) -> Result<String, BridgeError> {
722 self.manager.update_active(&self.connection_id, None).await;
723 let support = self
724 .manager
725 .agent_support
726 .as_ref()
727 .ok_or_else(|| BridgeError::NotSupported("Agent support not available".into()))?;
728
729 let results = support
730 .memory
731 .search(&query, limit as usize)
732 .map_err(|e| BridgeError::Internal(format!("Memory search failed: {}", e)))?;
733
734 if results.is_empty() {
735 return Ok(format!("No results found for '{}'", query));
736 }
737
738 let mut output = format!("Found {} results for '{}':\n", results.len(), query);
739 for (i, result) in results.iter().enumerate() {
740 output.push_str(&format!(
741 "\n{}. {} (lines {}-{})\n",
742 i + 1,
743 result.file,
744 result.line_start,
745 result.line_end
746 ));
747 output.push_str(&format!(" Score: {:.3}\n", result.score));
748 let preview: String = result.content.chars().take(200).collect();
749 let preview = preview.replace('\n', " ");
750 output.push_str(&format!(
751 " {}{}\n",
752 preview,
753 if result.content.len() > 200 {
754 "..."
755 } else {
756 ""
757 }
758 ));
759 }
760
761 Ok(output)
762 }
763
764 async fn memory_stats(self, _: context::Context) -> Result<String, BridgeError> {
765 self.manager.update_active(&self.connection_id, None).await;
766 let support = self
767 .manager
768 .agent_support
769 .as_ref()
770 .ok_or_else(|| BridgeError::NotSupported("Agent support not available".into()))?;
771
772 let stats = support
773 .memory
774 .stats()
775 .map_err(|e| BridgeError::Internal(format!("Failed to get stats: {}", e)))?;
776
777 let mut output = String::new();
778 output.push_str("Memory Statistics\n");
779 output.push_str("-----------------\n");
780 output.push_str(&format!("Workspace: {}\n", stats.workspace));
781 output.push_str(&format!("Total files: {}\n", stats.total_files));
782 output.push_str(&format!("Total chunks: {}\n", stats.total_chunks));
783 output.push_str(&format!("Index size: {} KB\n", stats.index_size_kb));
784 output.push_str("\nFiles:\n");
785 for file in &stats.files {
786 output.push_str(&format!(
787 " {} ({} chunks, {} lines)\n",
788 file.name, file.chunks, file.lines
789 ));
790 }
791
792 Ok(output)
793 }
794}
795
796fn validate_bridge_id(id: &str) -> Result<()> {
797 if id.is_empty() {
798 anyhow::bail!("Bridge ID cannot be empty");
799 }
800 if id.len() > 64 {
801 anyhow::bail!("Bridge ID is too long (max 64 chars)");
802 }
803 if !id
804 .chars()
805 .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
806 {
807 anyhow::bail!("Bridge ID contains invalid characters. Allowed: a-z, A-Z, 0-9, -, _");
808 }
809 Ok(())
810}
811
812#[cfg(test)]
813mod tests {
814 use super::*;
815
816 #[test]
817 fn test_health_status_serialization() {
818 let healthy = HealthStatus::Healthy;
819 assert_eq!(serde_json::to_string(&healthy).unwrap(), "\"healthy\"");
820
821 let degraded = HealthStatus::Degraded;
822 assert_eq!(serde_json::to_string(°raded).unwrap(), "\"degraded\"");
823
824 let unhealthy = HealthStatus::Unhealthy;
825 assert_eq!(serde_json::to_string(&unhealthy).unwrap(), "\"unhealthy\"");
826 }
827
828 #[test]
829 fn test_health_check_config_default() {
830 let config = HealthCheckConfig::default();
831 assert_eq!(config.check_interval, Duration::from_secs(30));
832 assert_eq!(config.degraded_threshold, Duration::from_secs(60));
833 assert_eq!(config.unhealthy_threshold, Duration::from_secs(120));
834 assert!(config.log_warnings);
835 }
836
837 #[tokio::test]
838 async fn test_bridge_status_initial_health() {
839 let manager = BridgeManager::new();
840 let identity = PeerIdentity {
841 pid: Some(1234),
842 uid: Some(1000),
843 gid: Some(1000),
844 };
845
846 manager.add_connection("test-conn", &identity).await;
847
848 let bridges = manager.get_active_bridges().await;
849 assert_eq!(bridges.len(), 1);
850 assert_eq!(bridges[0].health, HealthStatus::Healthy);
851 assert_eq!(bridges[0].consecutive_failures, 0);
852 }
853
854 #[tokio::test]
855 async fn test_update_active_resets_health() {
856 let manager = BridgeManager::new();
857 let identity = PeerIdentity {
858 pid: Some(1234),
859 uid: Some(1000),
860 gid: Some(1000),
861 };
862
863 manager.add_connection("test-conn", &identity).await;
864
865 {
867 let mut bridges = manager.active_bridges.write().await;
868 let status = bridges.get_mut("test-conn").unwrap();
869 status.health = HealthStatus::Unhealthy;
870 status.consecutive_failures = 5;
871 }
872
873 manager
875 .update_active("test-conn", Some("telegram".to_string()))
876 .await;
877
878 let bridges = manager.get_active_bridges().await;
879 assert_eq!(bridges[0].health, HealthStatus::Healthy);
880 assert_eq!(bridges[0].consecutive_failures, 0);
881 assert_eq!(bridges[0].bridge_id, Some("telegram".to_string()));
882 }
883
884 #[tokio::test]
885 async fn test_health_check_degraded() {
886 let config = HealthCheckConfig {
887 check_interval: Duration::from_secs(30),
888 degraded_threshold: Duration::from_secs(5),
889 unhealthy_threshold: Duration::from_secs(10),
890 log_warnings: false,
891 };
892 let manager = BridgeManager::with_health_config(config);
893 let identity = PeerIdentity {
894 pid: Some(1234),
895 uid: Some(1000),
896 gid: Some(1000),
897 };
898
899 manager.add_connection("test-conn", &identity).await;
900
901 {
903 let mut bridges = manager.active_bridges.write().await;
904 let status = bridges.get_mut("test-conn").unwrap();
905 status.last_active = Utc::now() - chrono::Duration::seconds(7);
907 }
908
909 manager.check_bridge_health().await;
911
912 let bridges = manager.get_active_bridges().await;
913 assert_eq!(bridges[0].health, HealthStatus::Degraded);
914 assert_eq!(bridges[0].consecutive_failures, 1);
915 }
916
917 #[tokio::test]
918 async fn test_health_check_unhealthy() {
919 let config = HealthCheckConfig {
920 check_interval: Duration::from_secs(30),
921 degraded_threshold: Duration::from_secs(5),
922 unhealthy_threshold: Duration::from_secs(10),
923 log_warnings: false,
924 };
925 let manager = BridgeManager::with_health_config(config);
926 let identity = PeerIdentity {
927 pid: Some(1234),
928 uid: Some(1000),
929 gid: Some(1000),
930 };
931
932 manager.add_connection("test-conn", &identity).await;
933
934 {
936 let mut bridges = manager.active_bridges.write().await;
937 let status = bridges.get_mut("test-conn").unwrap();
938 status.last_active = Utc::now() - chrono::Duration::seconds(15);
940 }
941
942 manager.check_bridge_health().await;
944
945 let bridges = manager.get_active_bridges().await;
946 assert_eq!(bridges[0].health, HealthStatus::Unhealthy);
947 assert_eq!(bridges[0].consecutive_failures, 1);
948 }
949
950 #[tokio::test]
951 async fn test_health_check_consecutive_failures() {
952 let config = HealthCheckConfig {
953 check_interval: Duration::from_secs(30),
954 degraded_threshold: Duration::from_secs(5),
955 unhealthy_threshold: Duration::from_secs(10),
956 log_warnings: false,
957 };
958 let manager = BridgeManager::with_health_config(config);
959 let identity = PeerIdentity {
960 pid: Some(1234),
961 uid: Some(1000),
962 gid: Some(1000),
963 };
964
965 manager.add_connection("test-conn", &identity).await;
966
967 {
969 let mut bridges = manager.active_bridges.write().await;
970 let status = bridges.get_mut("test-conn").unwrap();
971 status.last_active = Utc::now() - chrono::Duration::seconds(15);
972 }
973
974 manager.check_bridge_health().await;
976 manager.check_bridge_health().await;
977 manager.check_bridge_health().await;
978
979 let bridges = manager.get_active_bridges().await;
980 assert_eq!(bridges[0].consecutive_failures, 3);
981 }
982
983 #[tokio::test]
984 async fn test_health_check_healthy_resets_failures() {
985 let config = HealthCheckConfig {
986 check_interval: Duration::from_secs(30),
987 degraded_threshold: Duration::from_secs(5),
988 unhealthy_threshold: Duration::from_secs(10),
989 log_warnings: false,
990 };
991 let manager = BridgeManager::with_health_config(config);
992 let identity = PeerIdentity {
993 pid: Some(1234),
994 uid: Some(1000),
995 gid: Some(1000),
996 };
997
998 manager.add_connection("test-conn", &identity).await;
999
1000 {
1002 let mut bridges = manager.active_bridges.write().await;
1003 let status = bridges.get_mut("test-conn").unwrap();
1004 status.consecutive_failures = 5;
1005 status.health = HealthStatus::Unhealthy;
1006 }
1007
1008 manager.check_bridge_health().await;
1011
1012 let bridges = manager.get_active_bridges().await;
1013 assert_eq!(bridges[0].health, HealthStatus::Healthy);
1014 assert_eq!(bridges[0].consecutive_failures, 0);
1015 }
1016
1017 #[tokio::test]
1018 async fn test_remove_connection() {
1019 let manager = BridgeManager::new();
1020 let identity = PeerIdentity {
1021 pid: Some(1234),
1022 uid: Some(1000),
1023 gid: Some(1000),
1024 };
1025
1026 manager.add_connection("test-conn", &identity).await;
1027 assert_eq!(manager.get_active_bridges().await.len(), 1);
1028
1029 manager.remove_connection("test-conn").await;
1030 assert_eq!(manager.get_active_bridges().await.len(), 0);
1031 }
1032
1033 #[test]
1034 fn test_validate_bridge_id() {
1035 assert!(validate_bridge_id("telegram").is_ok());
1036 assert!(validate_bridge_id("discord-bot").is_ok());
1037 assert!(validate_bridge_id("whatsapp_2").is_ok());
1038 assert!(validate_bridge_id("bridge123").is_ok());
1039
1040 assert!(validate_bridge_id("").is_err());
1041 assert!(validate_bridge_id(&"x".repeat(65)).is_err());
1042 assert!(validate_bridge_id("bridge!@#").is_err());
1043 assert!(validate_bridge_id("bridge name").is_err());
1044 }
1045}