1use std::collections::HashMap;
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12
13use anyhow::{bail, Context, Result};
14use chrono::Utc;
15use parking_lot::RwLock;
16use tokio::sync::Mutex;
17
18use super::conversation_buffer::{ConversationBuffer, ConversationTurn};
19use super::knowledge_bridge::KnowledgeBridge;
20use super::{
21 detection::{self, PathMatcher, Topic},
22 Space, SpaceId, SpaceSource,
23};
24use crate::event_bus::{EventBus, KernelEvent};
25use crate::state_store::StateStore;
26
27const MAX_ARCHIVE_AGE_DAYS: i64 = 30;
28#[allow(dead_code)]
29const DEFAULT_WORKSPACE_DIR: &str = ".oxios/spaces";
30
31#[derive(thiserror::Error, Debug)]
33pub enum SpaceManagerError {
34 #[error("Space not found: {0}")]
36 NotFound(SpaceId),
37 #[error("Cannot merge a Space with itself")]
39 SelfMerge,
40 #[error("Space is private and cannot be accessed: {0}")]
42 Private(SpaceId),
43}
44
45impl SpaceManagerError {
46 pub fn is_fatal(&self) -> bool {
48 matches!(self, Self::SelfMerge)
49 }
50}
51
52pub struct SpaceManager {
54 spaces: RwLock<HashMap<SpaceId, Space>>,
56 current_space_id: RwLock<SpaceId>,
58 state_store: Arc<StateStore>,
60 #[allow(dead_code)]
63 event_bus: EventBus,
64 path_matcher: RwLock<PathMatcher>,
66 buffer: Arc<Mutex<ConversationBuffer>>,
68 knowledge_bridge: Option<Arc<KnowledgeBridge>>,
70 root_dir: PathBuf,
72 #[allow(dead_code)]
75 turns_since_topic_check: Mutex<usize>,
76}
77
78fn default_space_id() -> SpaceId {
80 *crate::space::DEFAULT_SPACE_ID
81 .get_or_init(|| uuid::Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap())
82}
83
84impl SpaceManager {
85 #[allow(missing_docs)]
87 #[cfg(test)]
88 pub fn default_space_id_for_tests() -> SpaceId {
89 uuid::Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap()
91 }
92}
93
94impl SpaceManager {
95 pub async fn new(state_store: Arc<StateStore>, event_bus: EventBus) -> Result<Self> {
99 let root_dir = Self::default_root_dir();
100 let this = Self {
101 spaces: RwLock::new(HashMap::new()),
102 current_space_id: RwLock::new(default_space_id()),
103 state_store,
104 event_bus,
105 path_matcher: RwLock::new(PathMatcher::default()),
106 buffer: Arc::new(Mutex::new(ConversationBuffer::default())),
107 knowledge_bridge: None,
108 root_dir,
109 turns_since_topic_check: Mutex::new(0),
110 };
111
112 this.load_spaces().await?;
113 this.ensure_default_space().await?;
114 this.reindex_path_matcher();
115
116 Ok(this)
117 }
118
119 pub fn set_knowledge_bridge(&mut self, bridge: Arc<KnowledgeBridge>) {
121 self.knowledge_bridge = Some(bridge);
122 }
123
124 fn default_root_dir() -> PathBuf {
126 dirs::home_dir()
127 .unwrap_or_else(|| PathBuf::from("."))
128 .join(".oxios")
129 .join("spaces")
130 }
131
132 async fn load_spaces(&self) -> Result<()> {
134 let spaces_dir = &self.root_dir;
135
136 if !spaces_dir.exists() {
137 std::fs::create_dir_all(spaces_dir)?;
138 return Ok(());
139 }
140
141 let index_path = spaces_dir.join("_index.json");
143 if index_path.exists() {
144 let ids: Vec<SpaceId> = match self.state_store.load_json("_spaces", "_index.json").await
145 {
146 Ok(Some(ids)) => ids,
147 Ok(None) => Vec::new(),
148 Err(e) => {
149 tracing::warn!(error = %e, "Failed to load Space index, starting fresh");
150 Vec::new()
151 }
152 };
153 for id in ids {
154 let path = spaces_dir.join(id.to_string()).join("space.json");
155 if path.exists() {
156 if let Ok(space) = self.load_space_from_file(&path).await {
157 self.spaces.write().insert(space.id, space);
158 }
159 }
160 }
161 }
162
163 tracing::info!(count = self.spaces.read().len(), "Loaded Spaces from disk");
164 Ok(())
165 }
166
167 async fn load_space_from_file(&self, path: &PathBuf) -> Result<Space> {
169 let content =
170 std::fs::read_to_string(path).with_context(|| format!("reading {}", path.display()))?;
171 let space: Space = serde_json::from_str(&content)
172 .with_context(|| format!("parsing {}", path.display()))?;
173 Ok(space)
174 }
175
176 #[allow(clippy::await_holding_lock)]
178 async fn ensure_default_space(&self) -> Result<()> {
179 let spaces = self.spaces.read();
180 if spaces.contains_key(&default_space_id()) {
181 return Ok(());
182 }
183 drop(spaces);
184
185 let default = Space {
187 id: default_space_id(),
188 name: String::new(), source: SpaceSource::Manual,
190 paths: Vec::new(),
191 workspace_dir: self.default_workspace_dir(&default_space_id()),
192 tags: Vec::new(),
193 active: true,
194 created_at: Utc::now(),
195 last_active_at: Utc::now(),
196 interaction_count: 0,
197 knowledge_visible: true,
198 };
199
200 self.add_space(default).await
201 }
202
203 fn default_workspace_dir(&self, space_id: &SpaceId) -> PathBuf {
205 self.root_dir.join(space_id.to_string()).join("workspace")
206 }
207
208 async fn add_space(&self, mut space: Space) -> Result<()> {
210 let ws_dir = &space.workspace_dir;
212 if !ws_dir.exists() {
213 std::fs::create_dir_all(ws_dir)?;
214 }
215
216 self.save_space(&space).await?;
218
219 let mut spaces = self.spaces.write();
221 if space.active {
222 for s in spaces.values_mut() {
224 s.deactivate();
225 }
226 space.activate();
227 }
228 spaces.insert(space.id, space);
229
230 drop(spaces);
232 self.reindex_path_matcher();
233
234 Ok(())
235 }
236
237 async fn save_space(&self, space: &Space) -> Result<()> {
239 let space_dir = self.root_dir.join(space.id.to_string());
240 let space_file = space_dir.join("space.json");
241
242 if !space_dir.exists() {
243 std::fs::create_dir_all(&space_dir)?;
244 }
245
246 let json = serde_json::to_string_pretty(space)?;
247 std::fs::write(&space_file, json)?;
248
249 self.save_index().await?;
251
252 Ok(())
253 }
254
255 async fn save_index(&self) -> Result<()> {
257 let ids: Vec<SpaceId> = self.spaces.read().keys().cloned().collect();
258 let index_path = self.root_dir.join("_index.json");
259 let json = serde_json::to_string_pretty(&ids)?;
260 std::fs::write(index_path, json)?;
261 Ok(())
262 }
263
264 fn reindex_path_matcher(&self) {
266 let spaces = self.spaces.read();
267 let mut matcher = self.path_matcher.write();
268 *matcher = PathMatcher::default();
269 for space in spaces.values() {
270 matcher.register(space);
271 }
272 }
273
274 #[allow(clippy::await_holding_lock)]
281 pub async fn detect_or_create(
282 &self,
283 message: &str,
284 turns: &[ConversationTurn],
285 ) -> Result<SpaceId> {
286 let spaces = self.spaces.read().clone();
287 let spaces_vec: Vec<_> = spaces.into_values().collect();
288
289 if let Some(path) = detection::extract_filesystem_path(message) {
291 let matched_space_id = {
293 let matcher = self.path_matcher.read();
294 matcher.find_space(&path)
295 };
296
297 if let Some(space_id) = matched_space_id {
298 self.activate(&space_id).await?;
299 return Ok(space_id);
300 }
301
302 let name = detection::path_name(&path);
304 let mut space = Space::from_path(&path);
305 space.name = name;
306 space.workspace_dir = self.default_workspace_dir(&space.id);
307 space.tags.push(path.to_string_lossy().to_string());
308
309 self.add_space(space).await?;
310 let space_id = self.current_space_id();
311
312 self.event_bus.publish(KernelEvent::SpaceCreated {
313 space_id,
314 name: "unnamed".to_string(),
315 source: "auto_resource".to_string(),
316 })?;
317
318 return Ok(space_id);
319 }
320
321 if let Some(space_id) = detection::match_keywords(message, &spaces_vec) {
323 self.activate(&space_id).await?;
324 return Ok(space_id);
325 }
326
327 let should_check = ConversationBuffer::should_check_topic_from_messages(turns, 3);
329 if should_check {
330 let topic = detection::classify_topic_stub(message);
331
332 if topic.is_clear() {
333 if self.is_in_default_space() {
335 let new_space = self.promote_from_default(&topic.name).await?;
337 return Ok(new_space);
338 }
339
340 if self.topic_shifted(&topic) {
342 if let Some(space_id) = self.find_by_topic(&topic.name) {
343 self.activate(&space_id).await?;
344 return Ok(space_id);
345 }
346
347 let space = self.create_from_topic(&topic.name).await?;
349 return Ok(space.id);
350 }
351 }
352 }
353
354 Ok(self.current_space_id())
356 }
357
358 fn topic_shifted(&self, new_topic: &Topic) -> bool {
360 let current = self.current_space();
361 if let Some(space) = current {
362 if space.is_default() {
363 return true; }
365 let current_lower = space.name.to_lowercase();
367 let new_lower = new_topic.name.to_lowercase();
368 !current_lower.is_empty() && current_lower != new_lower
369 } else {
370 true
371 }
372 }
373
374 fn find_by_topic(&self, topic: &str) -> Option<SpaceId> {
376 let spaces = self.spaces.read();
377 let topic_lower = topic.to_lowercase();
378
379 for space in spaces.values() {
381 if space.name.to_lowercase() == topic_lower {
382 return Some(space.id);
383 }
384 }
385
386 for space in spaces.values() {
388 for tag in &space.tags {
389 if tag.to_lowercase() == topic_lower {
390 return Some(space.id);
391 }
392 }
393 }
394
395 None
396 }
397
398 async fn promote_from_default(&self, topic: &str) -> Result<SpaceId> {
403 let default_id = default_space_id();
404
405 {
407 let mut spaces = self.spaces.write();
408 if let Some(default) = spaces.get_mut(&default_id) {
409 default.name = String::new();
410 default.deactivate();
411 }
412 }
413
414 let mut new_space = Space::from_topic(topic);
416 new_space.workspace_dir = self.default_workspace_dir(&new_space.id);
417 new_space.active = true;
418
419 let new_id = new_space.id;
420 self.add_space(new_space).await?;
421
422 *self.current_space_id.write() = new_id;
424
425 self.event_bus.publish(KernelEvent::SpaceActivated {
426 space_id: new_id,
427 name: topic.to_string(),
428 })?;
429
430 tracing::info!(topic, "Promoted default Space to named Space");
431 Ok(new_id)
432 }
433
434 pub async fn create_from_topic(&self, topic: &str) -> Result<Space> {
436 let mut space = Space::from_topic(topic);
437 space.workspace_dir = self.default_workspace_dir(&space.id);
438
439 space.add_tag("topic");
441 space.add_tag(topic);
442
443 self.add_space(space.clone()).await?;
444
445 self.event_bus.publish(KernelEvent::SpaceCreated {
446 space_id: space.id,
447 name: space.name.clone(),
448 source: "auto_topic".to_string(),
449 })?;
450
451 Ok(space)
452 }
453
454 pub async fn create_from_path(&self, name: &str, path: &Path) -> Result<Space> {
456 let mut space = Space::from_path(path);
457 if !name.is_empty() {
458 space.name = name.to_string();
459 }
460 space.workspace_dir = self.default_workspace_dir(&space.id);
461
462 self.add_space(space.clone()).await?;
463
464 self.event_bus.publish(KernelEvent::SpaceCreated {
465 space_id: space.id,
466 name: space.name.clone(),
467 source: "auto_resource".to_string(),
468 })?;
469
470 Ok(space)
471 }
472
473 pub async fn activate(&self, space_id: &SpaceId) -> Result<()> {
475 {
476 let mut spaces = self.spaces.write();
477 for (id, space) in spaces.iter_mut() {
478 if *id == *space_id {
479 space.activate();
480 space.touch();
481 } else {
482 space.deactivate();
483 }
484 }
485 }
486 *self.current_space_id.write() = *space_id;
487
488 let space = self.current_space();
489 let (id, name) = if let Some(s) = space {
490 (s.id, s.name.clone())
491 } else {
492 (*space_id, String::new())
493 };
494
495 self.save_space(&Space {
496 id,
497 name: name.clone(),
498 source: SpaceSource::Manual,
499 paths: Vec::new(),
500 workspace_dir: self.default_workspace_dir(&id),
501 tags: Vec::new(),
502 active: true,
503 created_at: Utc::now(),
504 last_active_at: Utc::now(),
505 interaction_count: 1,
506 knowledge_visible: true,
507 })
508 .await
509 .ok(); self.event_bus.publish(KernelEvent::SpaceActivated {
512 space_id: *space_id,
513 name,
514 })?;
515
516 Ok(())
517 }
518
519 pub async fn get_space(&self, space_id: &SpaceId) -> Result<Option<Space>> {
521 Ok(self.spaces.read().get(space_id).cloned())
522 }
523
524 pub fn list(&self) -> Vec<Space> {
526 self.spaces.read().values().cloned().collect()
527 }
528
529 pub fn current_space_id(&self) -> SpaceId {
531 *self.current_space_id.read()
532 }
533
534 pub fn default_space_id(&self) -> SpaceId {
536 default_space_id()
537 }
538
539 pub fn current_space(&self) -> Option<Space> {
541 let current_id = self.current_space_id();
542 self.spaces.read().get(¤t_id).cloned()
543 }
544
545 pub fn is_in_default_space(&self) -> bool {
547 let current = self.current_space();
548 current.map(|s| s.is_default()).unwrap_or(true)
549 }
550
551 pub async fn merge_spaces(&self, survivor_id: SpaceId, absorbed_id: SpaceId) -> Result<()> {
556 if survivor_id == absorbed_id {
557 bail!(SpaceManagerError::SelfMerge);
558 }
559
560 let (mut survivor, absorbed) = {
562 let spaces = self.spaces.read();
563 let s = spaces.get(&survivor_id).cloned();
564 let a = spaces.get(&absorbed_id).cloned();
565 match (s, a) {
566 (Some(sv), Some(av)) => (sv, av),
567 _ => bail!(SpaceManagerError::NotFound(survivor_id)),
568 }
569 };
570
571 let entries_migrated = 0; survivor.last_active_at = Utc::now();
576 survivor.interaction_count += absorbed.interaction_count;
577
578 for tag in absorbed.tags {
580 survivor.add_tag(tag);
581 }
582
583 for path in absorbed.paths {
585 if !survivor.paths.contains(&path) {
586 survivor.paths.push(path);
587 }
588 }
589
590 self.save_space(&survivor).await?;
592
593 {
594 let mut spaces = self.spaces.write();
595 spaces.remove(&absorbed_id);
596 }
597
598 let absorbed_dir = self.root_dir.join(absorbed_id.to_string());
600 let archived_dir = self
601 .root_dir
602 .join("_archived")
603 .join(absorbed_id.to_string());
604 if absorbed_dir.exists() {
605 let _ = std::fs::create_dir_all(archived_dir.parent().unwrap());
606 let _ = std::fs::rename(&absorbed_dir, &archived_dir);
607 }
608
609 self.reindex_path_matcher();
611 self.save_index().await?;
612
613 self.event_bus.publish(KernelEvent::SpacesMerged {
615 survivor: survivor_id,
616 absorbed: absorbed_id,
617 entries_migrated,
618 })?;
619
620 tracing::info!(
621 survivor = %survivor_id,
622 absorbed = %absorbed_id,
623 "Spaces merged"
624 );
625
626 Ok(())
627 }
628
629 pub fn should_auto_merge(&self, a: &Space, b: &Space) -> bool {
631 if a.paths.is_empty() || b.paths.is_empty() {
633 return false;
634 }
635
636 let paths_overlap = a.paths.iter().any(|ap| {
637 b.paths
638 .iter()
639 .any(|bp| ap == bp || ap.starts_with(bp) || bp.starts_with(ap))
640 });
641
642 if !paths_overlap {
643 return false;
644 }
645
646 let a_tags: std::collections::HashSet<_> =
648 a.tags.iter().map(|t| t.to_lowercase()).collect();
649 let b_tags: std::collections::HashSet<_> =
650 b.tags.iter().map(|t| t.to_lowercase()).collect();
651
652 if a_tags.is_empty() && b_tags.is_empty() {
653 }
655
656 let both_low_activity = a.interaction_count < 5 && b.interaction_count < 5;
658
659 paths_overlap && both_low_activity
660 }
661
662 pub async fn archive_stale(&self) -> Result<Vec<SpaceId>> {
664 let cutoff = Utc::now() - chrono::Duration::days(MAX_ARCHIVE_AGE_DAYS);
665 let mut archived = Vec::new();
666
667 let stale_ids: Vec<SpaceId> = {
668 let spaces = self.spaces.read();
669 spaces
670 .values()
671 .filter(|s| s.id != default_space_id() && s.last_active_at < cutoff)
672 .map(|s| s.id)
673 .collect()
674 };
675
676 for id in stale_ids {
677 self.archive_space(&id).await?;
678 archived.push(id);
679 }
680
681 if !archived.is_empty() {
682 tracing::info!(count = archived.len(), "Archived stale Spaces");
683 }
684
685 Ok(archived)
686 }
687
688 async fn archive_space(&self, space_id: &SpaceId) -> Result<()> {
690 let space = {
691 let spaces = self.spaces.read();
692 spaces.get(space_id).cloned()
693 };
694
695 let space = match space {
696 Some(s) => s,
697 None => return Ok(()),
698 };
699
700 let src = self.root_dir.join(space_id.to_string());
702 let dst = self.root_dir.join("_archived").join(space_id.to_string());
703 if src.exists() {
704 std::fs::create_dir_all(dst.parent().unwrap())?;
705 std::fs::rename(&src, &dst)?;
706 }
707
708 {
710 let mut spaces = self.spaces.write();
711 spaces.remove(space_id);
712 }
713
714 self.save_index().await?;
715 self.reindex_path_matcher();
716
717 self.event_bus.publish(KernelEvent::SpaceArchived {
718 space_id: *space_id,
719 name: space.name,
720 })?;
721
722 Ok(())
723 }
724
725 pub async fn restore_from_archive(&self, space_id: &SpaceId) -> Result<()> {
727 let archived_dir = self.root_dir.join("_archived").join(space_id.to_string());
728
729 if !archived_dir.exists() {
730 bail!("Archived Space not found: {}", space_id);
731 }
732
733 let space_file = archived_dir.join("space.json");
735 let space: Space = if space_file.exists() {
736 serde_json::from_str(&std::fs::read_to_string(&space_file)?)?
737 } else {
738 bail!("Space data not found for {}", space_id);
739 };
740
741 let dst = self.root_dir.join(space_id.to_string());
743 std::fs::create_dir_all(&dst)?;
744 for entry in std::fs::read_dir(&archived_dir)? {
745 let entry = entry?;
746 let file_name = entry.file_name();
747 let src_file = archived_dir.join(&file_name);
748 let dst_file = dst.join(&file_name);
749 if src_file.is_file() {
750 std::fs::copy(&src_file, &dst_file)?;
751 }
752 }
753
754 self.add_space(space).await?;
756
757 let _ = std::fs::remove_dir_all(&archived_dir);
759
760 tracing::info!(space_id = %space_id, "Restored Space from archive");
761 Ok(())
762 }
763
764 pub fn knowledge_bridge(&self) -> Option<Arc<KnowledgeBridge>> {
766 self.knowledge_bridge.clone()
767 }
768
769 pub fn root_dir(&self) -> &PathBuf {
771 &self.root_dir
772 }
773
774 pub fn buffer(&self) -> Arc<Mutex<ConversationBuffer>> {
776 self.buffer.clone()
777 }
778}
779
780#[cfg(test)]
781mod tests {
782 use super::*;
783 use crate::space::SpaceSource;
784
785 fn test_state_store() -> Arc<StateStore> {
786 let dir = tempfile::tempdir().unwrap();
787 Arc::new(StateStore::new(dir.path().to_path_buf()).unwrap())
788 }
789
790 fn test_event_bus() -> EventBus {
791 EventBus::new(64)
792 }
793
794 #[tokio::test]
795 async fn test_ensure_default_space() {
796 let store = test_state_store();
797 let bus = test_event_bus();
798 let manager = SpaceManager::new(store, bus).await.unwrap();
799
800 let default = manager.get_space(&default_space_id()).await.unwrap();
801 assert!(default.is_some());
802 assert!(default.unwrap().is_default());
803 }
804
805 #[tokio::test]
806 async fn test_create_from_path() {
807 let store = test_state_store();
808 let bus = test_event_bus();
809 let manager = SpaceManager::new(store, bus).await.unwrap();
810
811 let path = PathBuf::from("/projects/oxios");
812 let space = manager.create_from_path("oxios", &path).await.unwrap();
813
814 assert_eq!(space.name, "oxios");
815 assert_eq!(space.paths, vec![path]);
816 assert_eq!(space.source, SpaceSource::AutoResource);
817 }
818
819 #[tokio::test]
820 async fn test_activate() {
821 let store = test_state_store();
822 let bus = test_event_bus();
823 let manager = SpaceManager::new(store, bus).await.unwrap();
824
825 let path = PathBuf::from("/projects/oxios");
826 let space = manager.create_from_path("oxios", &path).await.unwrap();
827
828 assert_eq!(manager.current_space_id(), default_space_id());
829
830 manager.activate(&space.id).await.unwrap();
831 assert_eq!(manager.current_space_id(), space.id);
832 }
833
834 #[tokio::test]
835 async fn test_is_in_default_space() {
836 let store = test_state_store();
837 let bus = test_event_bus();
838 let manager = SpaceManager::new(store, bus).await.unwrap();
839
840 assert!(manager.is_in_default_space());
841
842 let path = PathBuf::from("/projects/oxios");
843 let space = manager.create_from_path("oxios", &path).await.unwrap();
844 manager.activate(&space.id).await.unwrap();
845
846 assert!(!manager.is_in_default_space());
847 }
848
849 #[tokio::test]
850 async fn test_list() {
851 let store = test_state_store();
852 let bus = test_event_bus();
853 let manager = SpaceManager::new(store, bus).await.unwrap();
854
855 assert_eq!(manager.list().len(), 1); let path = PathBuf::from("/projects/oxios");
858 manager.create_from_path("oxios", &path).await.unwrap();
859
860 assert_eq!(manager.list().len(), 2);
861 }
862
863 #[tokio::test]
864 async fn test_merge_spaces_self_error() {
865 let store = test_state_store();
866 let bus = test_event_bus();
867 let manager = SpaceManager::new(store, bus).await.unwrap();
868
869 let result = manager
870 .merge_spaces(default_space_id(), default_space_id())
871 .await;
872 assert!(result.is_err());
873 assert!(matches!(
874 result.unwrap_err().downcast_ref(),
875 Some(SpaceManagerError::SelfMerge)
876 ));
877 }
878
879 #[tokio::test]
880 async fn test_should_auto_merge() {
881 let store = test_state_store();
882 let bus = test_event_bus();
883 let manager = SpaceManager::new(store, bus).await.unwrap();
884
885 let path = PathBuf::from("/projects/oxios");
886
887 let mut space1 = Space::from_path(&path);
888 space1.name = "oxios-dev".to_string();
889 space1.interaction_count = 2;
890
891 let mut space2 = Space::from_path(&path);
892 space2.name = "oxios-bugfix".to_string();
893 space2.interaction_count = 3;
894
895 assert!(manager.should_auto_merge(&space1, &space2));
897
898 space1.interaction_count = 10;
900 assert!(!manager.should_auto_merge(&space1, &space2));
901 }
902}