1use std::collections::HashMap;
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12
13use anyhow::{bail, Context, Result};
14use chrono::Utc;
15use parking_lot::RwLock;
16use tokio::sync::Mutex;
17
18use super::conversation_buffer::ConversationBuffer;
19use super::knowledge_bridge::KnowledgeBridge;
20use super::{
21 detection::{self, PathMatcher, Topic},
22 Space, SpaceId, SpaceSource,
23};
24use crate::event_bus::{EventBus, KernelEvent};
25use crate::state_store::StateStore;
26
27const MAX_ARCHIVE_AGE_DAYS: i64 = 30;
28#[allow(dead_code)]
29const DEFAULT_WORKSPACE_DIR: &str = ".oxios/spaces";
30
31#[derive(thiserror::Error, Debug)]
33pub enum SpaceManagerError {
34 #[error("Space not found: {0}")]
36 NotFound(SpaceId),
37 #[error("Cannot merge a Space with itself")]
39 SelfMerge,
40 #[error("Space is private and cannot be accessed: {0}")]
42 Private(SpaceId),
43}
44
45impl SpaceManagerError {
46 pub fn is_fatal(&self) -> bool {
48 matches!(self, Self::SelfMerge)
49 }
50}
51
52pub struct SpaceManager {
54 spaces: RwLock<HashMap<SpaceId, Space>>,
56 current_space_id: RwLock<SpaceId>,
58 state_store: Arc<StateStore>,
60 #[allow(dead_code)]
63 event_bus: EventBus,
64 path_matcher: RwLock<PathMatcher>,
66 buffer: Arc<Mutex<ConversationBuffer>>,
68 knowledge_bridge: Option<Arc<KnowledgeBridge>>,
70 root_dir: PathBuf,
72 #[allow(dead_code)]
75 turns_since_topic_check: Mutex<usize>,
76}
77
78fn default_space_id() -> SpaceId {
80 *crate::space::DEFAULT_SPACE_ID
81 .get_or_init(|| uuid::Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap())
82}
83
84impl SpaceManager {
85 #[allow(missing_docs)]
87 #[cfg(test)]
88 pub fn default_space_id_for_tests() -> SpaceId {
89 uuid::Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap()
91 }
92}
93
94impl SpaceManager {
95 pub async fn new(state_store: Arc<StateStore>, event_bus: EventBus) -> Result<Self> {
99 let root_dir = Self::default_root_dir();
100 let this = Self {
101 spaces: RwLock::new(HashMap::new()),
102 current_space_id: RwLock::new(default_space_id()),
103 state_store,
104 event_bus,
105 path_matcher: RwLock::new(PathMatcher::default()),
106 buffer: Arc::new(Mutex::new(ConversationBuffer::default())),
107 knowledge_bridge: None,
108 root_dir,
109 turns_since_topic_check: Mutex::new(0),
110 };
111
112 this.load_spaces().await?;
113 this.ensure_default_space().await?;
114 this.reindex_path_matcher();
115
116 Ok(this)
117 }
118
119 pub fn set_knowledge_bridge(&mut self, bridge: Arc<KnowledgeBridge>) {
121 self.knowledge_bridge = Some(bridge);
122 }
123
124 fn default_root_dir() -> PathBuf {
126 dirs::home_dir()
127 .unwrap_or_else(|| PathBuf::from("."))
128 .join(".oxios")
129 .join("spaces")
130 }
131
132 async fn load_spaces(&self) -> Result<()> {
134 let spaces_dir = &self.root_dir;
135
136 if !spaces_dir.exists() {
137 std::fs::create_dir_all(spaces_dir)?;
138 return Ok(());
139 }
140
141 let index_path = spaces_dir.join("_index.json");
143 if index_path.exists() {
144 let ids: Vec<SpaceId> = match self.state_store.load_json("_spaces", "_index.json").await
145 {
146 Ok(Some(ids)) => ids,
147 Ok(None) => Vec::new(),
148 Err(e) => {
149 tracing::warn!(error = %e, "Failed to load Space index, starting fresh");
150 Vec::new()
151 }
152 };
153 for id in ids {
154 let path = spaces_dir.join(id.to_string()).join("space.json");
155 if path.exists() {
156 if let Ok(space) = self.load_space_from_file(&path).await {
157 self.spaces.write().insert(space.id, space);
158 }
159 }
160 }
161 }
162
163 tracing::info!(count = self.spaces.read().len(), "Loaded Spaces from disk");
164 Ok(())
165 }
166
167 async fn load_space_from_file(&self, path: &PathBuf) -> Result<Space> {
169 let content =
170 std::fs::read_to_string(path).with_context(|| format!("reading {}", path.display()))?;
171 let space: Space = serde_json::from_str(&content)
172 .with_context(|| format!("parsing {}", path.display()))?;
173 Ok(space)
174 }
175
176 #[allow(clippy::await_holding_lock)]
178 async fn ensure_default_space(&self) -> Result<()> {
179 let spaces = self.spaces.read();
180 if spaces.contains_key(&default_space_id()) {
181 return Ok(());
182 }
183 drop(spaces);
184
185 let default = Space {
187 id: default_space_id(),
188 name: String::new(), source: SpaceSource::Manual,
190 paths: Vec::new(),
191 workspace_dir: self.default_workspace_dir(&default_space_id()),
192 tags: Vec::new(),
193 active: true,
194 created_at: Utc::now(),
195 last_active_at: Utc::now(),
196 interaction_count: 0,
197 knowledge_visible: true,
198 };
199
200 self.add_space(default).await
201 }
202
203 fn default_workspace_dir(&self, space_id: &SpaceId) -> PathBuf {
205 self.root_dir.join(space_id.to_string()).join("workspace")
206 }
207
208 async fn add_space(&self, mut space: Space) -> Result<()> {
210 let ws_dir = &space.workspace_dir;
212 if !ws_dir.exists() {
213 std::fs::create_dir_all(ws_dir)?;
214 }
215
216 self.save_space(&space).await?;
218
219 let mut spaces = self.spaces.write();
221 if space.active {
222 for s in spaces.values_mut() {
224 s.deactivate();
225 }
226 space.activate();
227 }
228 spaces.insert(space.id, space);
229
230 drop(spaces);
232 self.reindex_path_matcher();
233
234 Ok(())
235 }
236
237 async fn save_space(&self, space: &Space) -> Result<()> {
239 let space_dir = self.root_dir.join(space.id.to_string());
240 let space_file = space_dir.join("space.json");
241
242 if !space_dir.exists() {
243 std::fs::create_dir_all(&space_dir)?;
244 }
245
246 let json = serde_json::to_string_pretty(space)?;
247 std::fs::write(&space_file, json)?;
248
249 self.save_index().await?;
251
252 Ok(())
253 }
254
255 async fn save_index(&self) -> Result<()> {
257 let ids: Vec<SpaceId> = self.spaces.read().keys().cloned().collect();
258 let index_path = self.root_dir.join("_index.json");
259 let json = serde_json::to_string_pretty(&ids)?;
260 std::fs::write(index_path, json)?;
261 Ok(())
262 }
263
264 fn reindex_path_matcher(&self) {
266 let spaces = self.spaces.read();
267 let mut matcher = self.path_matcher.write();
268 *matcher = PathMatcher::default();
269 for space in spaces.values() {
270 matcher.register(space);
271 }
272 }
273
274 #[allow(clippy::await_holding_lock)]
281 pub async fn detect_or_create(
282 &self,
283 message: &str,
284 buffer: &ConversationBuffer,
285 ) -> Result<SpaceId> {
286 let spaces = self.spaces.read().clone();
287 let spaces_vec: Vec<_> = spaces.into_values().collect();
288
289 if let Some(path) = detection::extract_filesystem_path(message) {
291 let matcher = self.path_matcher.read();
293 if let Some(space_id) = matcher.find_space(&path) {
294 self.activate(&space_id).await?;
295 return Ok(space_id);
296 }
297
298 let name = detection::path_name(&path);
300 let mut space = Space::from_path(&path);
301 space.name = name;
302 space.workspace_dir = self.default_workspace_dir(&space.id);
303 space.tags.push(path.to_string_lossy().to_string());
304
305 self.add_space(space).await?;
306 let space_id = self.current_space_id();
307
308 self.event_bus.publish(KernelEvent::SpaceCreated {
309 space_id,
310 name: "unnamed".to_string(),
311 source: "auto_resource".to_string(),
312 })?;
313
314 return Ok(space_id);
315 }
316
317 if let Some(space_id) = detection::match_keywords(message, &spaces_vec) {
319 self.activate(&space_id).await?;
320 return Ok(space_id);
321 }
322
323 let should_check = buffer.should_check_topic(3);
325 if should_check {
326 let topic = detection::classify_topic_stub(message);
327
328 if topic.is_clear() {
329 if self.is_in_default_space() {
331 let new_space = self.promote_from_default(&topic.name).await?;
333 return Ok(new_space);
334 }
335
336 if self.topic_shifted(&topic) {
338 if let Some(space_id) = self.find_by_topic(&topic.name) {
339 self.activate(&space_id).await?;
340 return Ok(space_id);
341 }
342
343 let space = self.create_from_topic(&topic.name).await?;
345 return Ok(space.id);
346 }
347 }
348 }
349
350 Ok(self.current_space_id())
352 }
353
354 fn topic_shifted(&self, new_topic: &Topic) -> bool {
356 let current = self.current_space();
357 if let Some(space) = current {
358 if space.is_default() {
359 return true; }
361 let current_lower = space.name.to_lowercase();
363 let new_lower = new_topic.name.to_lowercase();
364 !current_lower.is_empty() && current_lower != new_lower
365 } else {
366 true
367 }
368 }
369
370 fn find_by_topic(&self, topic: &str) -> Option<SpaceId> {
372 let spaces = self.spaces.read();
373 let topic_lower = topic.to_lowercase();
374
375 for space in spaces.values() {
377 if space.name.to_lowercase() == topic_lower {
378 return Some(space.id);
379 }
380 }
381
382 for space in spaces.values() {
384 for tag in &space.tags {
385 if tag.to_lowercase() == topic_lower {
386 return Some(space.id);
387 }
388 }
389 }
390
391 None
392 }
393
394 async fn promote_from_default(&self, topic: &str) -> Result<SpaceId> {
399 let default_id = default_space_id();
400
401 {
403 let mut spaces = self.spaces.write();
404 if let Some(default) = spaces.get_mut(&default_id) {
405 default.name = String::new();
406 default.deactivate();
407 }
408 }
409
410 let mut new_space = Space::from_topic(topic);
412 new_space.workspace_dir = self.default_workspace_dir(&new_space.id);
413 new_space.active = true;
414
415 let new_id = new_space.id;
416 self.add_space(new_space).await?;
417
418 *self.current_space_id.write() = new_id;
420
421 self.event_bus.publish(KernelEvent::SpaceActivated {
422 space_id: new_id,
423 name: topic.to_string(),
424 })?;
425
426 tracing::info!(topic, "Promoted default Space to named Space");
427 Ok(new_id)
428 }
429
430 pub async fn create_from_topic(&self, topic: &str) -> Result<Space> {
432 let mut space = Space::from_topic(topic);
433 space.workspace_dir = self.default_workspace_dir(&space.id);
434
435 space.add_tag("topic");
437 space.add_tag(topic);
438
439 self.add_space(space.clone()).await?;
440
441 self.event_bus.publish(KernelEvent::SpaceCreated {
442 space_id: space.id,
443 name: space.name.clone(),
444 source: "auto_topic".to_string(),
445 })?;
446
447 Ok(space)
448 }
449
450 pub async fn create_from_path(&self, name: &str, path: &Path) -> Result<Space> {
452 let mut space = Space::from_path(path);
453 if !name.is_empty() {
454 space.name = name.to_string();
455 }
456 space.workspace_dir = self.default_workspace_dir(&space.id);
457
458 self.add_space(space.clone()).await?;
459
460 self.event_bus.publish(KernelEvent::SpaceCreated {
461 space_id: space.id,
462 name: space.name.clone(),
463 source: "auto_resource".to_string(),
464 })?;
465
466 Ok(space)
467 }
468
469 pub async fn activate(&self, space_id: &SpaceId) -> Result<()> {
471 {
472 let mut spaces = self.spaces.write();
473 for (id, space) in spaces.iter_mut() {
474 if *id == *space_id {
475 space.activate();
476 space.touch();
477 } else {
478 space.deactivate();
479 }
480 }
481 }
482 *self.current_space_id.write() = *space_id;
483
484 let space = self.current_space();
485 let (id, name) = if let Some(s) = space {
486 (s.id, s.name.clone())
487 } else {
488 (*space_id, String::new())
489 };
490
491 self.save_space(&Space {
492 id,
493 name: name.clone(),
494 source: SpaceSource::Manual,
495 paths: Vec::new(),
496 workspace_dir: self.default_workspace_dir(&id),
497 tags: Vec::new(),
498 active: true,
499 created_at: Utc::now(),
500 last_active_at: Utc::now(),
501 interaction_count: 1,
502 knowledge_visible: true,
503 })
504 .await
505 .ok(); self.event_bus.publish(KernelEvent::SpaceActivated {
508 space_id: *space_id,
509 name,
510 })?;
511
512 Ok(())
513 }
514
515 pub async fn get_space(&self, space_id: &SpaceId) -> Result<Option<Space>> {
517 Ok(self.spaces.read().get(space_id).cloned())
518 }
519
520 pub fn list(&self) -> Vec<Space> {
522 self.spaces.read().values().cloned().collect()
523 }
524
525 pub fn current_space_id(&self) -> SpaceId {
527 *self.current_space_id.read()
528 }
529
530 pub fn default_space_id(&self) -> SpaceId {
532 default_space_id()
533 }
534
535 pub fn current_space(&self) -> Option<Space> {
537 let current_id = self.current_space_id();
538 self.spaces.read().get(¤t_id).cloned()
539 }
540
541 pub fn is_in_default_space(&self) -> bool {
543 let current = self.current_space();
544 current.map(|s| s.is_default()).unwrap_or(true)
545 }
546
547 pub async fn merge_spaces(&self, survivor_id: SpaceId, absorbed_id: SpaceId) -> Result<()> {
552 if survivor_id == absorbed_id {
553 bail!(SpaceManagerError::SelfMerge);
554 }
555
556 let (mut survivor, absorbed) = {
558 let spaces = self.spaces.read();
559 let s = spaces.get(&survivor_id).cloned();
560 let a = spaces.get(&absorbed_id).cloned();
561 match (s, a) {
562 (Some(sv), Some(av)) => (sv, av),
563 _ => bail!(SpaceManagerError::NotFound(survivor_id)),
564 }
565 };
566
567 let entries_migrated = 0; survivor.last_active_at = Utc::now();
572 survivor.interaction_count += absorbed.interaction_count;
573
574 for tag in absorbed.tags {
576 survivor.add_tag(tag);
577 }
578
579 for path in absorbed.paths {
581 if !survivor.paths.contains(&path) {
582 survivor.paths.push(path);
583 }
584 }
585
586 self.save_space(&survivor).await?;
588
589 {
590 let mut spaces = self.spaces.write();
591 spaces.remove(&absorbed_id);
592 }
593
594 let absorbed_dir = self.root_dir.join(absorbed_id.to_string());
596 let archived_dir = self
597 .root_dir
598 .join("_archived")
599 .join(absorbed_id.to_string());
600 if absorbed_dir.exists() {
601 let _ = std::fs::create_dir_all(archived_dir.parent().unwrap());
602 let _ = std::fs::rename(&absorbed_dir, &archived_dir);
603 }
604
605 self.reindex_path_matcher();
607 self.save_index().await?;
608
609 self.event_bus.publish(KernelEvent::SpacesMerged {
611 survivor: survivor_id,
612 absorbed: absorbed_id,
613 entries_migrated,
614 })?;
615
616 tracing::info!(
617 survivor = %survivor_id,
618 absorbed = %absorbed_id,
619 "Spaces merged"
620 );
621
622 Ok(())
623 }
624
625 pub fn should_auto_merge(&self, a: &Space, b: &Space) -> bool {
627 if a.paths.is_empty() || b.paths.is_empty() {
629 return false;
630 }
631
632 let paths_overlap = a.paths.iter().any(|ap| {
633 b.paths
634 .iter()
635 .any(|bp| ap == bp || ap.starts_with(bp) || bp.starts_with(ap))
636 });
637
638 if !paths_overlap {
639 return false;
640 }
641
642 let a_tags: std::collections::HashSet<_> =
644 a.tags.iter().map(|t| t.to_lowercase()).collect();
645 let b_tags: std::collections::HashSet<_> =
646 b.tags.iter().map(|t| t.to_lowercase()).collect();
647
648 if a_tags.is_empty() && b_tags.is_empty() {
649 }
651
652 let both_low_activity = a.interaction_count < 5 && b.interaction_count < 5;
654
655 paths_overlap && both_low_activity
656 }
657
658 pub async fn archive_stale(&self) -> Result<Vec<SpaceId>> {
660 let cutoff = Utc::now() - chrono::Duration::days(MAX_ARCHIVE_AGE_DAYS);
661 let mut archived = Vec::new();
662
663 let stale_ids: Vec<SpaceId> = {
664 let spaces = self.spaces.read();
665 spaces
666 .values()
667 .filter(|s| s.id != default_space_id() && s.last_active_at < cutoff)
668 .map(|s| s.id)
669 .collect()
670 };
671
672 for id in stale_ids {
673 self.archive_space(&id).await?;
674 archived.push(id);
675 }
676
677 if !archived.is_empty() {
678 tracing::info!(count = archived.len(), "Archived stale Spaces");
679 }
680
681 Ok(archived)
682 }
683
684 async fn archive_space(&self, space_id: &SpaceId) -> Result<()> {
686 let space = {
687 let spaces = self.spaces.read();
688 spaces.get(space_id).cloned()
689 };
690
691 let space = match space {
692 Some(s) => s,
693 None => return Ok(()),
694 };
695
696 let src = self.root_dir.join(space_id.to_string());
698 let dst = self.root_dir.join("_archived").join(space_id.to_string());
699 if src.exists() {
700 std::fs::create_dir_all(dst.parent().unwrap())?;
701 std::fs::rename(&src, &dst)?;
702 }
703
704 {
706 let mut spaces = self.spaces.write();
707 spaces.remove(space_id);
708 }
709
710 self.save_index().await?;
711 self.reindex_path_matcher();
712
713 self.event_bus.publish(KernelEvent::SpaceArchived {
714 space_id: *space_id,
715 name: space.name,
716 })?;
717
718 Ok(())
719 }
720
721 pub async fn restore_from_archive(&self, space_id: &SpaceId) -> Result<()> {
723 let archived_dir = self.root_dir.join("_archived").join(space_id.to_string());
724
725 if !archived_dir.exists() {
726 bail!("Archived Space not found: {}", space_id);
727 }
728
729 let space_file = archived_dir.join("space.json");
731 let space: Space = if space_file.exists() {
732 serde_json::from_str(&std::fs::read_to_string(&space_file)?)?
733 } else {
734 bail!("Space data not found for {}", space_id);
735 };
736
737 let dst = self.root_dir.join(space_id.to_string());
739 std::fs::create_dir_all(&dst)?;
740 for entry in std::fs::read_dir(&archived_dir)? {
741 let entry = entry?;
742 let file_name = entry.file_name();
743 let src_file = archived_dir.join(&file_name);
744 let dst_file = dst.join(&file_name);
745 if src_file.is_file() {
746 std::fs::copy(&src_file, &dst_file)?;
747 }
748 }
749
750 self.add_space(space).await?;
752
753 let _ = std::fs::remove_dir_all(&archived_dir);
755
756 tracing::info!(space_id = %space_id, "Restored Space from archive");
757 Ok(())
758 }
759
760 pub fn knowledge_bridge(&self) -> Option<Arc<KnowledgeBridge>> {
762 self.knowledge_bridge.clone()
763 }
764
765 pub fn root_dir(&self) -> &PathBuf {
767 &self.root_dir
768 }
769
770 pub fn buffer(&self) -> Arc<Mutex<ConversationBuffer>> {
772 self.buffer.clone()
773 }
774}
775
776#[cfg(test)]
777mod tests {
778 use super::*;
779 use crate::space::SpaceSource;
780
781 fn test_state_store() -> Arc<StateStore> {
782 let dir = tempfile::tempdir().unwrap();
783 Arc::new(StateStore::new(dir.path().to_path_buf()).unwrap())
784 }
785
786 fn test_event_bus() -> EventBus {
787 EventBus::new(64)
788 }
789
790 #[tokio::test]
791 async fn test_ensure_default_space() {
792 let store = test_state_store();
793 let bus = test_event_bus();
794 let manager = SpaceManager::new(store, bus).await.unwrap();
795
796 let default = manager.get_space(&default_space_id()).await.unwrap();
797 assert!(default.is_some());
798 assert!(default.unwrap().is_default());
799 }
800
801 #[tokio::test]
802 async fn test_create_from_path() {
803 let store = test_state_store();
804 let bus = test_event_bus();
805 let manager = SpaceManager::new(store, bus).await.unwrap();
806
807 let path = PathBuf::from("/projects/oxios");
808 let space = manager.create_from_path("oxios", &path).await.unwrap();
809
810 assert_eq!(space.name, "oxios");
811 assert_eq!(space.paths, vec![path]);
812 assert_eq!(space.source, SpaceSource::AutoResource);
813 }
814
815 #[tokio::test]
816 async fn test_activate() {
817 let store = test_state_store();
818 let bus = test_event_bus();
819 let manager = SpaceManager::new(store, bus).await.unwrap();
820
821 let path = PathBuf::from("/projects/oxios");
822 let space = manager.create_from_path("oxios", &path).await.unwrap();
823
824 assert_eq!(manager.current_space_id(), default_space_id());
825
826 manager.activate(&space.id).await.unwrap();
827 assert_eq!(manager.current_space_id(), space.id);
828 }
829
830 #[tokio::test]
831 async fn test_is_in_default_space() {
832 let store = test_state_store();
833 let bus = test_event_bus();
834 let manager = SpaceManager::new(store, bus).await.unwrap();
835
836 assert!(manager.is_in_default_space());
837
838 let path = PathBuf::from("/projects/oxios");
839 let space = manager.create_from_path("oxios", &path).await.unwrap();
840 manager.activate(&space.id).await.unwrap();
841
842 assert!(!manager.is_in_default_space());
843 }
844
845 #[tokio::test]
846 async fn test_list() {
847 let store = test_state_store();
848 let bus = test_event_bus();
849 let manager = SpaceManager::new(store, bus).await.unwrap();
850
851 assert_eq!(manager.list().len(), 1); let path = PathBuf::from("/projects/oxios");
854 manager.create_from_path("oxios", &path).await.unwrap();
855
856 assert_eq!(manager.list().len(), 2);
857 }
858
859 #[tokio::test]
860 async fn test_merge_spaces_self_error() {
861 let store = test_state_store();
862 let bus = test_event_bus();
863 let manager = SpaceManager::new(store, bus).await.unwrap();
864
865 let result = manager
866 .merge_spaces(default_space_id(), default_space_id())
867 .await;
868 assert!(result.is_err());
869 assert!(matches!(
870 result.unwrap_err().downcast_ref(),
871 Some(SpaceManagerError::SelfMerge)
872 ));
873 }
874
875 #[tokio::test]
876 async fn test_should_auto_merge() {
877 let store = test_state_store();
878 let bus = test_event_bus();
879 let manager = SpaceManager::new(store, bus).await.unwrap();
880
881 let path = PathBuf::from("/projects/oxios");
882
883 let mut space1 = Space::from_path(&path);
884 space1.name = "oxios-dev".to_string();
885 space1.interaction_count = 2;
886
887 let mut space2 = Space::from_path(&path);
888 space2.name = "oxios-bugfix".to_string();
889 space2.interaction_count = 3;
890
891 assert!(manager.should_auto_merge(&space1, &space2));
893
894 space1.interaction_count = 10;
896 assert!(!manager.should_auto_merge(&space1, &space2));
897 }
898}