1use serde::{Deserialize, Serialize};
47use std::fs;
48use std::path::PathBuf;
49use std::time::{SystemTime, UNIX_EPOCH};
50
51#[derive(Serialize, Deserialize, Clone, Debug)]
52pub struct FunctionClaim {
53 pub file_path: String,
54 pub function_name: String,
55 pub node_id: Option<String>,
56 pub claimed_at: u64,
57}
58
59#[derive(Serialize, Deserialize, Clone, Debug)]
60pub struct SentinelClaims {
61 pub session_id: String,
62 pub agent_id: String,
63 pub pid: u32,
64 pub last_heartbeat: u64,
65 pub claims: Vec<FunctionClaim>,
66}
67
68#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
69pub enum ZoneMode {
70 Warn,
71 Block,
72}
73
74#[derive(Serialize, Deserialize, Clone, Debug)]
75pub struct ZoneRule {
76 pub zone_id: String,
77 pub session_id: String,
78 pub patterns: Vec<String>,
79 pub mode: ZoneMode,
80}
81
82#[derive(Serialize, Deserialize, Clone, Debug)]
83pub struct Collision {
84 pub file_path: String,
85 pub function_name: String,
86 pub held_by_session: String,
87 pub held_by_agent: String,
88}
89
90#[derive(Serialize, Deserialize, Clone, Debug)]
91pub struct SentinelMessage {
92 pub id: String,
93 pub from_session: String,
94 pub from_agent: String,
95 pub to_session: Option<String>,
96 pub content: String,
97 pub timestamp: u64,
98 pub read_by: Vec<String>,
99}
100
101pub struct SentinelManager {
102 base_dir: PathBuf,
103}
104
105impl SentinelManager {
106 pub fn new(base_dir: PathBuf) -> Self {
107 Self { base_dir }
108 }
109
110 fn claims_dir(&self) -> PathBuf {
111 self.base_dir.join("claims")
112 }
113
114 fn zones_dir(&self) -> PathBuf {
115 self.base_dir.join("zones")
116 }
117
118 fn collisions_marker(&self) -> PathBuf {
119 self.base_dir.join("collisions_pending")
120 }
121
122 fn messages_dir(&self) -> PathBuf {
123 self.base_dir.join("messages")
124 }
125
126 fn unread_marker(&self) -> PathBuf {
127 self.base_dir.join("unread_pending")
128 }
129
130 fn ensure_dirs(&self) {
131 let _ = fs::create_dir_all(self.claims_dir());
132 let _ = fs::create_dir_all(self.zones_dir());
133 let _ = fs::create_dir_all(self.messages_dir());
134 }
135
136 fn now() -> u64 {
137 SystemTime::now()
138 .duration_since(UNIX_EPOCH)
139 .unwrap_or_default()
140 .as_secs()
141 }
142
143 fn claim_path(&self, session_id: &str) -> PathBuf {
144 self.claims_dir().join(format!("{}.json", session_id))
145 }
146
147 fn atomic_write(path: &PathBuf, data: &[u8]) -> Result<(), String> {
148 let tmp = path.with_extension("tmp");
149 fs::write(&tmp, data).map_err(|e| format!("Write error: {}", e))?;
150 fs::rename(&tmp, path).map_err(|e| format!("Rename error: {}", e))?;
151 Ok(())
152 }
153
154 fn load_all_claims(&self) -> Vec<SentinelClaims> {
155 let dir = self.claims_dir();
156 let mut all = Vec::new();
157 if let Ok(entries) = fs::read_dir(&dir) {
158 for entry in entries.flatten() {
159 if entry.path().extension().is_some_and(|x| x == "json") {
160 if let Ok(content) = fs::read_to_string(entry.path()) {
161 if let Ok(claims) = serde_json::from_str::<SentinelClaims>(&content) {
162 all.push(claims);
163 }
164 }
165 }
166 }
167 }
168 all
169 }
170
171 fn is_pid_alive(pid: u32) -> bool {
172 use sysinfo::System;
173 let mut sys = System::new();
174 sys.refresh_processes(
175 sysinfo::ProcessesToUpdate::Some(&[sysinfo::Pid::from_u32(pid)]),
176 true,
177 );
178 sys.process(sysinfo::Pid::from_u32(pid)).is_some()
179 }
180
181 pub fn cleanup_stale(&self) -> usize {
182 self.ensure_dirs();
183 let now = Self::now();
184 let zombie_threshold = 3600;
185 let mut removed = 0;
186
187 let all = self.load_all_claims();
188 for claims in &all {
189 let pid_dead = !Self::is_pid_alive(claims.pid);
190 let zombie = now - claims.last_heartbeat > zombie_threshold;
191 if pid_dead || zombie {
192 let path = self.claim_path(&claims.session_id);
193 let _ = fs::remove_file(&path);
194 removed += 1;
195 }
196 }
197
198 if removed > 0 {
199 self.update_collision_marker();
200 }
201 removed
202 }
203
204 pub fn claim_functions(
205 &self,
206 session_id: &str,
207 agent_id: &str,
208 pid: u32,
209 file_path: &str,
210 functions: &[String],
211 ) -> Vec<Collision> {
212 self.ensure_dirs();
213
214 let path = self.claim_path(session_id);
215 let mut my_claims = if let Ok(content) = fs::read_to_string(&path) {
216 serde_json::from_str::<SentinelClaims>(&content).unwrap_or_else(|_| SentinelClaims {
217 session_id: session_id.to_string(),
218 agent_id: agent_id.to_string(),
219 pid,
220 last_heartbeat: Self::now(),
221 claims: Vec::new(),
222 })
223 } else {
224 SentinelClaims {
225 session_id: session_id.to_string(),
226 agent_id: agent_id.to_string(),
227 pid,
228 last_heartbeat: Self::now(),
229 claims: Vec::new(),
230 }
231 };
232
233 let others = self.load_all_claims();
234 let mut collisions = Vec::new();
235
236 for func_name in functions {
237 for other in &others {
238 if other.session_id == session_id {
239 continue;
240 }
241 for claim in &other.claims {
242 if claim.file_path == file_path && claim.function_name == *func_name {
243 collisions.push(Collision {
244 file_path: file_path.to_string(),
245 function_name: func_name.clone(),
246 held_by_session: other.session_id.clone(),
247 held_by_agent: other.agent_id.clone(),
248 });
249 }
250 }
251 }
252
253 let already = my_claims
254 .claims
255 .iter()
256 .any(|c| c.file_path == file_path && c.function_name == *func_name);
257 if !already {
258 my_claims.claims.push(FunctionClaim {
259 file_path: file_path.to_string(),
260 function_name: func_name.clone(),
261 node_id: None,
262 claimed_at: Self::now(),
263 });
264 }
265 }
266
267 my_claims.last_heartbeat = Self::now();
268 if let Ok(json) = serde_json::to_string_pretty(&my_claims) {
269 let _ = Self::atomic_write(&path, json.as_bytes());
270 }
271
272 self.update_collision_marker();
273 collisions
274 }
275
276 pub fn release_claims(&self, session_id: &str) {
277 let path = self.claim_path(session_id);
278 let _ = fs::remove_file(&path);
279 self.update_collision_marker();
280 }
281
282 pub fn release_file_claims(&self, session_id: &str, file_path: &str) {
283 let path = self.claim_path(session_id);
284 if let Ok(content) = fs::read_to_string(&path) {
285 if let Ok(mut claims) = serde_json::from_str::<SentinelClaims>(&content) {
286 claims.claims.retain(|c| c.file_path != file_path);
287 claims.last_heartbeat = Self::now();
288 if let Ok(json) = serde_json::to_string_pretty(&claims) {
289 let _ = Self::atomic_write(&path, json.as_bytes());
290 }
291 }
292 }
293 self.update_collision_marker();
294 }
295
296 pub fn check_zone(&self, session_id: &str, file_path: &str) -> Option<ZoneRule> {
297 let dir = self.zones_dir();
298 if let Ok(entries) = fs::read_dir(&dir) {
299 for entry in entries.flatten() {
300 if entry.path().extension().is_some_and(|x| x == "json") {
301 if let Ok(content) = fs::read_to_string(entry.path()) {
302 if let Ok(zone) = serde_json::from_str::<ZoneRule>(&content) {
303 if zone.session_id == session_id {
304 continue;
305 }
306 for pattern in &zone.patterns {
307 if file_matches_pattern(file_path, pattern) {
308 return Some(zone);
309 }
310 }
311 }
312 }
313 }
314 }
315 }
316 None
317 }
318
319 pub fn update_heartbeat(&self, session_id: &str) {
320 let path = self.claim_path(session_id);
321 if let Ok(content) = fs::read_to_string(&path) {
322 if let Ok(mut claims) = serde_json::from_str::<SentinelClaims>(&content) {
323 claims.last_heartbeat = Self::now();
324 if let Ok(json) = serde_json::to_string_pretty(&claims) {
325 let _ = Self::atomic_write(&path, json.as_bytes());
326 }
327 }
328 }
329 }
330
331 pub fn get_status(&self, session_id: &str) -> serde_json::Value {
332 self.ensure_dirs();
333 let all = self.load_all_claims();
334
335 let own = all.iter().find(|c| c.session_id == session_id);
336 let others: Vec<&SentinelClaims> =
337 all.iter().filter(|c| c.session_id != session_id).collect();
338
339 let mut collisions = Vec::new();
340 if let Some(mine) = own {
341 for my_claim in &mine.claims {
342 for other in &others {
343 for their_claim in &other.claims {
344 if my_claim.file_path == their_claim.file_path
345 && my_claim.function_name == their_claim.function_name
346 {
347 collisions.push(serde_json::json!({
348 "file": my_claim.file_path,
349 "function": my_claim.function_name,
350 "held_by_session": other.session_id,
351 "held_by_agent": other.agent_id,
352 }));
353 }
354 }
355 }
356 }
357 }
358
359 let zones = self.list_zones();
360
361 serde_json::json!({
362 "session_id": session_id,
363 "own_claims": own.map(|c| &c.claims).unwrap_or(&Vec::new())
364 .iter()
365 .map(|c| serde_json::json!({
366 "file": c.file_path,
367 "function": c.function_name,
368 "claimed_at": c.claimed_at,
369 }))
370 .collect::<Vec<_>>(),
371 "other_sessions": others.iter().map(|o| serde_json::json!({
372 "session_id": o.session_id,
373 "agent_id": o.agent_id,
374 "pid": o.pid,
375 "claim_count": o.claims.len(),
376 "last_heartbeat": o.last_heartbeat,
377 })).collect::<Vec<_>>(),
378 "collisions": collisions,
379 "zones": zones.iter().map(|z| serde_json::json!({
380 "zone_id": z.zone_id,
381 "session_id": z.session_id,
382 "patterns": z.patterns,
383 "mode": format!("{:?}", z.mode),
384 })).collect::<Vec<_>>(),
385 "total_active_sessions": all.len(),
386 })
387 }
388
389 pub fn create_zone(&self, session_id: &str, patterns: Vec<String>, mode: ZoneMode) -> ZoneRule {
390 self.ensure_dirs();
391 let zone_id = format!("zone-{}", &uuid::Uuid::new_v4().to_string()[..8]);
392 let zone = ZoneRule {
393 zone_id: zone_id.clone(),
394 session_id: session_id.to_string(),
395 patterns,
396 mode,
397 };
398 let path = self.zones_dir().join(format!("{}.json", zone_id));
399 if let Ok(json) = serde_json::to_string_pretty(&zone) {
400 let _ = Self::atomic_write(&path, json.as_bytes());
401 }
402 zone
403 }
404
405 pub fn list_zones(&self) -> Vec<ZoneRule> {
406 let dir = self.zones_dir();
407 let mut zones = Vec::new();
408 if let Ok(entries) = fs::read_dir(&dir) {
409 for entry in entries.flatten() {
410 if entry.path().extension().is_some_and(|x| x == "json") {
411 if let Ok(content) = fs::read_to_string(entry.path()) {
412 if let Ok(zone) = serde_json::from_str::<ZoneRule>(&content) {
413 zones.push(zone);
414 }
415 }
416 }
417 }
418 }
419 zones
420 }
421
422 pub fn send_message(
423 &self,
424 from_session: &str,
425 from_agent: &str,
426 to_session: Option<&str>,
427 content: &str,
428 ) -> SentinelMessage {
429 self.ensure_dirs();
430 let msg = SentinelMessage {
431 id: format!("msg-{}", &uuid::Uuid::new_v4().to_string()[..8]),
432 from_session: from_session.to_string(),
433 from_agent: from_agent.to_string(),
434 to_session: to_session.map(|s| s.to_string()),
435 content: content.to_string(),
436 timestamp: Self::now(),
437 read_by: vec![from_session.to_string()],
438 };
439 let path = self.messages_dir().join(format!("{}.json", msg.id));
440 if let Ok(json) = serde_json::to_string_pretty(&msg) {
441 let _ = Self::atomic_write(&path, json.as_bytes());
442 }
443 self.update_unread_marker();
444 msg
445 }
446
447 pub fn read_messages(&self, session_id: &str, limit: usize) -> Vec<(SentinelMessage, bool)> {
448 self.ensure_dirs();
449 let dir = self.messages_dir();
450 let mut messages = Vec::new();
451
452 if let Ok(entries) = fs::read_dir(&dir) {
453 for entry in entries.flatten() {
454 if entry.path().extension().is_some_and(|x| x == "json") {
455 if let Ok(content) = fs::read_to_string(entry.path()) {
456 if let Ok(msg) = serde_json::from_str::<SentinelMessage>(&content) {
457 let dominated = msg.to_session.is_none()
458 || msg.to_session.as_deref() == Some(session_id)
459 || msg.from_session == session_id;
460 if dominated {
461 messages.push((entry.path(), msg));
462 }
463 }
464 }
465 }
466 }
467 }
468
469 messages.sort_by(|a, b| b.1.timestamp.cmp(&a.1.timestamp));
470
471 let mut result = Vec::new();
472 for entry in &mut messages {
473 let was_unread = !entry.1.read_by.contains(&session_id.to_string());
474 if was_unread {
475 entry.1.read_by.push(session_id.to_string());
476 if let Ok(json) = serde_json::to_string_pretty(&entry.1) {
477 let _ = Self::atomic_write(&entry.0, json.as_bytes());
478 }
479 }
480 result.push((entry.1.clone(), was_unread));
481 }
482
483 self.update_unread_marker();
484 result.into_iter().take(limit).collect()
485 }
486
487 pub fn get_unread_messages(&self, session_id: &str) -> Vec<SentinelMessage> {
488 let dir = self.messages_dir();
489 let mut unread = Vec::new();
490 if let Ok(entries) = fs::read_dir(&dir) {
491 for entry in entries.flatten() {
492 if entry.path().extension().is_some_and(|x| x == "json") {
493 if let Ok(content) = fs::read_to_string(entry.path()) {
494 if let Ok(msg) = serde_json::from_str::<SentinelMessage>(&content) {
495 let dominated = msg.to_session.is_none()
496 || msg.to_session.as_deref() == Some(session_id);
497 if dominated
498 && msg.from_session != session_id
499 && !msg.read_by.contains(&session_id.to_string())
500 {
501 unread.push(msg);
502 }
503 }
504 }
505 }
506 }
507 }
508 unread.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
509 unread
510 }
511
512 pub fn unread_count(&self, session_id: &str) -> u64 {
513 let dir = self.messages_dir();
514 let mut count = 0u64;
515 if let Ok(entries) = fs::read_dir(&dir) {
516 for entry in entries.flatten() {
517 if entry.path().extension().is_some_and(|x| x == "json") {
518 if let Ok(content) = fs::read_to_string(entry.path()) {
519 if let Ok(msg) = serde_json::from_str::<SentinelMessage>(&content) {
520 let dominated = msg.to_session.is_none()
521 || msg.to_session.as_deref() == Some(session_id);
522 if dominated
523 && msg.from_session != session_id
524 && !msg.read_by.contains(&session_id.to_string())
525 {
526 count += 1;
527 }
528 }
529 }
530 }
531 }
532 }
533 count
534 }
535
536 pub fn list_agents(&self) -> Vec<serde_json::Value> {
537 self.ensure_dirs();
538 self.cleanup_stale();
539 let all = self.load_all_claims();
540 all.iter()
541 .map(|c| {
542 serde_json::json!({
543 "session_id": c.session_id,
544 "agent_id": c.agent_id,
545 "pid": c.pid,
546 "claim_count": c.claims.len(),
547 "last_heartbeat": c.last_heartbeat,
548 "files": c.claims.iter()
549 .map(|cl| cl.file_path.clone())
550 .collect::<std::collections::HashSet<_>>()
551 .into_iter()
552 .collect::<Vec<_>>(),
553 })
554 })
555 .collect()
556 }
557
558 pub fn cleanup_old_messages(&self) -> usize {
559 let dir = self.messages_dir();
560 let now = Self::now();
561 let max_age = 3600;
562 let mut removed = 0;
563 if let Ok(entries) = fs::read_dir(&dir) {
564 for entry in entries.flatten() {
565 if entry.path().extension().is_some_and(|x| x == "json") {
566 if let Ok(content) = fs::read_to_string(entry.path()) {
567 if let Ok(msg) = serde_json::from_str::<SentinelMessage>(&content) {
568 if now - msg.timestamp > max_age {
569 let _ = fs::remove_file(entry.path());
570 removed += 1;
571 }
572 }
573 }
574 }
575 }
576 }
577 if removed > 0 {
578 self.update_unread_marker();
579 }
580 removed
581 }
582
583 fn update_unread_marker(&self) {
584 let dir = self.messages_dir();
585 let mut total_unread = 0u64;
586
587 let all_sessions = self.load_all_claims();
588 if let Ok(entries) = fs::read_dir(&dir) {
589 for entry in entries.flatten() {
590 if entry.path().extension().is_some_and(|x| x == "json") {
591 if let Ok(content) = fs::read_to_string(entry.path()) {
592 if let Ok(msg) = serde_json::from_str::<SentinelMessage>(&content) {
593 for sess in &all_sessions {
594 if sess.session_id == msg.from_session {
595 continue;
596 }
597 let dominated = msg.to_session.is_none()
598 || msg.to_session.as_deref() == Some(&sess.session_id);
599 if dominated && !msg.read_by.contains(&sess.session_id) {
600 total_unread += 1;
601 break;
602 }
603 }
604 }
605 }
606 }
607 }
608 }
609
610 let marker = self.unread_marker();
611 if total_unread > 0 {
612 let _ = fs::write(&marker, total_unread.to_string());
613 } else {
614 let _ = fs::remove_file(&marker);
615 }
616 }
617
618 fn update_collision_marker(&self) {
619 let all = self.load_all_claims();
620 let mut collision_count: u64 = 0;
621
622 for (i, a) in all.iter().enumerate() {
623 for b in all.iter().skip(i + 1) {
624 for ca in &a.claims {
625 for cb in &b.claims {
626 if ca.file_path == cb.file_path && ca.function_name == cb.function_name {
627 collision_count += 1;
628 }
629 }
630 }
631 }
632 }
633
634 let marker = self.collisions_marker();
635 if collision_count > 0 {
636 let _ = fs::write(&marker, collision_count.to_string());
637 } else {
638 let _ = fs::remove_file(&marker);
639 }
640 }
641}
642
643fn file_matches_pattern(file_path: &str, pattern: &str) -> bool {
644 let pattern = pattern.trim_end_matches('*');
645 file_path.starts_with(pattern)
646}
647
648#[cfg(test)]
649mod tests {
650 use super::*;
651 use std::path::PathBuf;
652
653 fn temp_dir() -> PathBuf {
654 let dir = std::env::temp_dir().join(format!("aura-zones-test-{}", uuid::Uuid::new_v4()));
655 let _ = fs::create_dir_all(&dir);
656 dir
657 }
658
659 #[test]
660 fn test_claim_and_detect_collision() {
661 let dir = temp_dir();
662 let mgr = SentinelManager::new(dir.clone());
663
664 let c1 = mgr.claim_functions("s1", "claude-1", 99999, "src/main.rs", &["foo".into()]);
665 assert!(c1.is_empty());
666
667 let c2 = mgr.claim_functions("s2", "claude-2", 99998, "src/main.rs", &["foo".into()]);
668 assert_eq!(c2.len(), 1);
669 assert_eq!(c2[0].function_name, "foo");
670 assert_eq!(c2[0].held_by_session, "s1");
671
672 let _ = fs::remove_dir_all(&dir);
673 }
674
675 #[test]
676 fn test_zone_warn_and_block() {
677 let dir = temp_dir();
678 let mgr = SentinelManager::new(dir.clone());
679
680 mgr.create_zone("s1", vec!["src/auth/".into()], ZoneMode::Block);
681
682 let zone = mgr.check_zone("s2", "src/auth/login.rs");
683 assert!(zone.is_some());
684 assert_eq!(zone.unwrap().mode, ZoneMode::Block);
685
686 let no_zone = mgr.check_zone("s2", "src/utils.rs");
687 assert!(no_zone.is_none());
688
689 let _ = fs::remove_dir_all(&dir);
690 }
691
692 #[test]
693 fn test_messaging_round_trip() {
694 let dir = temp_dir();
695 let mgr = SentinelManager::new(dir.clone());
696
697 mgr.claim_functions("s1", "claude-1", 99999, "x.rs", &[]);
698 mgr.claim_functions("s2", "claude-2", 99998, "y.rs", &[]);
699
700 mgr.send_message("s1", "claude-1", None, "hello from s1");
701
702 let unread = mgr.unread_count("s2");
703 assert_eq!(unread, 1);
704
705 let msgs = mgr.read_messages("s2", 10);
706 assert_eq!(msgs.len(), 1);
707 assert!(msgs[0].1); assert_eq!(msgs[0].0.content, "hello from s1");
709
710 let unread_after = mgr.unread_count("s2");
711 assert_eq!(unread_after, 0);
712
713 let _ = fs::remove_dir_all(&dir);
714 }
715
716 #[test]
717 fn test_release_claims() {
718 let dir = temp_dir();
719 let mgr = SentinelManager::new(dir.clone());
720
721 mgr.claim_functions("s1", "claude", 99999, "a.rs", &["bar".into()]);
722 mgr.release_claims("s1");
723
724 let c2 = mgr.claim_functions("s2", "claude-2", 99998, "a.rs", &["bar".into()]);
725 assert!(c2.is_empty());
726
727 let _ = fs::remove_dir_all(&dir);
728 }
729
730 #[test]
731 fn test_file_matches_pattern() {
732 assert!(file_matches_pattern("src/auth/login.rs", "src/auth/"));
733 assert!(file_matches_pattern("src/auth/login.rs", "src/auth/*"));
734 assert!(!file_matches_pattern("src/utils.rs", "src/auth/"));
735 }
736}