agtrace_runtime/client/
watch_service.rs

1use crate::client::{MonitorBuilder, StreamHandle};
2use crate::config::Config;
3use crate::runtime::SessionStreamer;
4use agtrace_index::Database;
5use anyhow::Result;
6use std::path::PathBuf;
7use std::sync::{Arc, Mutex};
8
9#[derive(Clone)]
10pub struct WatchService {
11    db: Arc<Mutex<Database>>,
12    config: Arc<Config>,
13    provider_configs: Arc<Vec<(String, PathBuf)>>,
14}
15
16impl WatchService {
17    pub fn new(
18        db: Arc<Mutex<Database>>,
19        config: Arc<Config>,
20        provider_configs: Arc<Vec<(String, PathBuf)>>,
21    ) -> Self {
22        Self {
23            db,
24            config,
25            provider_configs,
26        }
27    }
28
29    pub fn watch_session(&self, session_id: &str) -> Result<StreamHandle> {
30        // Try to resolve short ID to full ID from database
31        // If not found, use the provided session_id as-is (might be a full ID for a new session)
32        let resolved_id = {
33            let db = self.db.lock().unwrap();
34            if let Some(session) = db.get_session_by_id(session_id)? {
35                session.id
36            } else if let Some(full_id) = db.find_session_by_prefix(session_id)? {
37                full_id
38            } else {
39                // Not in database - use as-is and let filesystem scan handle it
40                session_id.to_string()
41            }
42        };
43
44        let session_opt = {
45            let db = self.db.lock().unwrap();
46            db.get_session_by_id(&resolved_id)?
47        };
48
49        let streamer = if let Some(session) = session_opt {
50            // Session exists in database, use normal attach
51            let adapter = agtrace_providers::create_adapter(&session.provider)?;
52            SessionStreamer::attach(resolved_id.clone(), self.db.clone(), Arc::new(adapter))?
53        } else {
54            // Session not in database yet, scan filesystem directly
55            // Try each configured provider until we find the session
56            let mut last_error = None;
57
58            for (provider_name, log_root) in self.provider_configs.iter() {
59                match agtrace_providers::create_adapter(provider_name) {
60                    Ok(adapter) => {
61                        match SessionStreamer::attach_from_filesystem(
62                            resolved_id.clone(),
63                            log_root.clone(),
64                            Arc::new(adapter),
65                        ) {
66                            Ok(streamer) => return Ok(StreamHandle::new(streamer)),
67                            Err(e) => last_error = Some(e),
68                        }
69                    }
70                    Err(e) => last_error = Some(e),
71                }
72            }
73
74            return Err(last_error
75                .unwrap_or_else(|| anyhow::anyhow!("No providers configured"))
76                .context(format!("Session not found: {}", resolved_id)));
77        };
78
79        Ok(StreamHandle::new(streamer))
80    }
81
82    pub fn watch_provider(&self, provider_name: &str) -> Result<MonitorBuilder> {
83        let log_root = self
84            .provider_configs
85            .iter()
86            .find(|(name, _)| name == provider_name)
87            .map(|(_, path)| path.clone())
88            .ok_or_else(|| anyhow::anyhow!("Provider '{}' not configured", provider_name))?;
89
90        Ok(MonitorBuilder::new(
91            self.db.clone(),
92            Arc::new(vec![(provider_name.to_string(), log_root)]),
93        ))
94    }
95
96    pub fn config(&self) -> &Config {
97        &self.config
98    }
99
100    pub fn database(&self) -> Arc<Mutex<Database>> {
101        self.db.clone()
102    }
103}