agtrace_runtime/client/
watch_service.rs

1use crate::client::{MonitorBuilder, StreamHandle};
2use crate::config::Config;
3use crate::runtime::SessionStreamer;
4use crate::{Error, Result};
5use agtrace_core::project_hash_from_root;
6use agtrace_index::Database;
7use std::path::{Path, PathBuf};
8use std::sync::{Arc, Mutex};
9
10#[derive(Clone)]
11pub struct WatchService {
12    db: Arc<Mutex<Database>>,
13    config: Arc<Config>,
14    provider_configs: Arc<Vec<(String, PathBuf)>>,
15}
16
17impl WatchService {
18    pub fn new(
19        db: Arc<Mutex<Database>>,
20        config: Arc<Config>,
21        provider_configs: Arc<Vec<(String, PathBuf)>>,
22    ) -> Self {
23        Self {
24            db,
25            config,
26            provider_configs,
27        }
28    }
29
30    pub fn watch_session(&self, session_id: &str) -> Result<StreamHandle> {
31        // Try to resolve short ID to full ID from database
32        // If not found, use the provided session_id as-is (might be a full ID for a new session)
33        let resolved_id = {
34            let db = self.db.lock().unwrap();
35            if let Some(session) = db.get_session_by_id(session_id)? {
36                session.id
37            } else if let Some(full_id) = db.find_session_by_prefix(session_id)? {
38                full_id
39            } else {
40                // Not in database - use as-is and let filesystem scan handle it
41                session_id.to_string()
42            }
43        };
44
45        let session_opt = {
46            let db = self.db.lock().unwrap();
47            db.get_session_by_id(&resolved_id)?
48        };
49
50        let streamer = if let Some(session) = session_opt {
51            // Session exists in database, use normal attach
52            let adapter = agtrace_providers::create_adapter(&session.provider)?;
53            SessionStreamer::attach(resolved_id.clone(), self.db.clone(), Arc::new(adapter))?
54        } else {
55            // Session not in database yet, scan filesystem directly
56            // Try each configured provider until we find the session
57            let mut last_error = None;
58
59            for (provider_name, log_root) in self.provider_configs.iter() {
60                match agtrace_providers::create_adapter(provider_name) {
61                    Ok(adapter) => {
62                        match SessionStreamer::attach_from_filesystem(
63                            resolved_id.clone(),
64                            log_root.clone(),
65                            Arc::new(adapter),
66                        ) {
67                            Ok(streamer) => return Ok(StreamHandle::new(streamer)),
68                            Err(e) => last_error = Some(e),
69                        }
70                    }
71                    Err(e) => last_error = Some(Error::Provider(e)),
72                }
73            }
74
75            return Err(last_error.unwrap_or_else(|| {
76                Error::InvalidOperation(format!("Session not found: {}", resolved_id))
77            }));
78        };
79
80        Ok(StreamHandle::new(streamer))
81    }
82
83    pub fn watch_provider(&self, provider_name: &str) -> Result<MonitorBuilder> {
84        let log_root = self
85            .provider_configs
86            .iter()
87            .find(|(name, _)| name == provider_name)
88            .map(|(_, path)| path.clone())
89            .ok_or_else(|| {
90                Error::InvalidOperation(format!("Provider '{}' not configured", provider_name))
91            })?;
92
93        Ok(MonitorBuilder::new(
94            self.db.clone(),
95            Arc::new(vec![(provider_name.to_string(), log_root)]),
96        ))
97    }
98
99    pub fn watch_all_providers(&self) -> Result<MonitorBuilder> {
100        Ok(MonitorBuilder::new(
101            self.db.clone(),
102            self.provider_configs.clone(),
103        ))
104    }
105
106    pub fn config(&self) -> &Config {
107        &self.config
108    }
109
110    pub fn database(&self) -> Arc<Mutex<Database>> {
111        self.db.clone()
112    }
113
114    /// Find the provider with the most recently updated session.
115    ///
116    /// If `project_root` is specified, only sessions from that project are considered.
117    /// Otherwise, searches across all projects.
118    ///
119    /// Returns the provider name with the most recent session, or None if no sessions found.
120    ///
121    /// # Design Rationale
122    /// - Watch mode needs real-time detection of "most recently updated" sessions
123    /// - Cannot rely on DB indexing since watch bypasses DB for real-time monitoring
124    /// - Directly scans filesystem using LogDiscovery::scan_sessions()
125    /// - Uses SessionIndex.latest_mod_time (file modification time) instead of timestamp (creation time)
126    /// - This enables switching to sessions that are actively being updated, even if created earlier
127    /// - Filters by project_root to ensure watch attaches to sessions in the current project only
128    pub fn find_most_recent_provider(&self, project_root: Option<&Path>) -> Option<String> {
129        // Calculate target project hash if project_root is specified
130        let target_project_hash =
131            project_root.map(|root| project_hash_from_root(&root.display().to_string()));
132
133        // Track the most recently updated session across all providers
134        let mut most_recent: Option<(String, String)> = None; // (provider_name, latest_mod_time)
135
136        for (provider_name, log_root) in self.provider_configs.iter() {
137            // Create adapter for this provider
138            let adapter = match agtrace_providers::create_adapter(provider_name) {
139                Ok(a) => a,
140                Err(_) => continue,
141            };
142
143            // Scan filesystem directly (bypassing DB for real-time detection)
144            let sessions = match adapter.discovery.scan_sessions(log_root) {
145                Ok(s) => s,
146                Err(_) => continue,
147            };
148
149            // Find the session with the latest modification time in this provider
150            for session in sessions {
151                // Filter by project if project_root is specified
152                if let Some(ref target_hash) = target_project_hash {
153                    let session_hash = session
154                        .project_root
155                        .as_ref()
156                        .map(|root| project_hash_from_root(&root.to_string_lossy()));
157                    if session_hash.as_ref() != Some(target_hash) {
158                        continue;
159                    }
160                }
161
162                if let Some(ref mod_time) = session.latest_mod_time
163                    && (most_recent.is_none()
164                        || Some(mod_time) > most_recent.as_ref().map(|(_, t)| t))
165                {
166                    most_recent = Some((provider_name.clone(), mod_time.clone()));
167                }
168            }
169        }
170
171        most_recent.map(|(provider, _)| provider)
172    }
173}