agtrace_runtime/client/
watch_service.rs1use crate::client::{MonitorBuilder, StreamHandle};
2use crate::config::Config;
3use crate::runtime::SessionStreamer;
4use crate::{Error, Result};
5use agtrace_core::project_hash_from_root;
6use agtrace_index::Database;
7use std::path::{Path, PathBuf};
8use std::sync::{Arc, Mutex};
9
10#[derive(Clone)]
11pub struct WatchService {
12 db: Arc<Mutex<Database>>,
13 config: Arc<Config>,
14 provider_configs: Arc<Vec<(String, PathBuf)>>,
15}
16
17impl WatchService {
18 pub fn new(
19 db: Arc<Mutex<Database>>,
20 config: Arc<Config>,
21 provider_configs: Arc<Vec<(String, PathBuf)>>,
22 ) -> Self {
23 Self {
24 db,
25 config,
26 provider_configs,
27 }
28 }
29
30 pub fn watch_session(&self, session_id: &str) -> Result<StreamHandle> {
31 let resolved_id = {
34 let db = self.db.lock().unwrap();
35 if let Some(session) = db.get_session_by_id(session_id)? {
36 session.id
37 } else if let Some(full_id) = db.find_session_by_prefix(session_id)? {
38 full_id
39 } else {
40 session_id.to_string()
42 }
43 };
44
45 let session_opt = {
46 let db = self.db.lock().unwrap();
47 db.get_session_by_id(&resolved_id)?
48 };
49
50 let streamer = if let Some(session) = session_opt {
51 let adapter = agtrace_providers::create_adapter(&session.provider)?;
53 SessionStreamer::attach(resolved_id.clone(), self.db.clone(), Arc::new(adapter))?
54 } else {
55 let mut last_error = None;
58
59 for (provider_name, log_root) in self.provider_configs.iter() {
60 match agtrace_providers::create_adapter(provider_name) {
61 Ok(adapter) => {
62 match SessionStreamer::attach_from_filesystem(
63 resolved_id.clone(),
64 log_root.clone(),
65 Arc::new(adapter),
66 ) {
67 Ok(streamer) => return Ok(StreamHandle::new(streamer)),
68 Err(e) => last_error = Some(e),
69 }
70 }
71 Err(e) => last_error = Some(Error::Provider(e)),
72 }
73 }
74
75 return Err(last_error.unwrap_or_else(|| {
76 Error::InvalidOperation(format!("Session not found: {}", resolved_id))
77 }));
78 };
79
80 Ok(StreamHandle::new(streamer))
81 }
82
83 pub fn watch_provider(&self, provider_name: &str) -> Result<MonitorBuilder> {
84 let log_root = self
85 .provider_configs
86 .iter()
87 .find(|(name, _)| name == provider_name)
88 .map(|(_, path)| path.clone())
89 .ok_or_else(|| {
90 Error::InvalidOperation(format!("Provider '{}' not configured", provider_name))
91 })?;
92
93 Ok(MonitorBuilder::new(
94 self.db.clone(),
95 Arc::new(vec![(provider_name.to_string(), log_root)]),
96 ))
97 }
98
99 pub fn watch_all_providers(&self) -> Result<MonitorBuilder> {
100 Ok(MonitorBuilder::new(
101 self.db.clone(),
102 self.provider_configs.clone(),
103 ))
104 }
105
106 pub fn config(&self) -> &Config {
107 &self.config
108 }
109
110 pub fn database(&self) -> Arc<Mutex<Database>> {
111 self.db.clone()
112 }
113
114 pub fn find_most_recent_provider(&self, project_root: Option<&Path>) -> Option<String> {
129 let target_project_hash =
131 project_root.map(|root| project_hash_from_root(&root.display().to_string()));
132
133 let mut most_recent: Option<(String, String)> = None; for (provider_name, log_root) in self.provider_configs.iter() {
137 let adapter = match agtrace_providers::create_adapter(provider_name) {
139 Ok(a) => a,
140 Err(_) => continue,
141 };
142
143 let sessions = match adapter.discovery.scan_sessions(log_root) {
145 Ok(s) => s,
146 Err(_) => continue,
147 };
148
149 for session in sessions {
151 if let Some(ref target_hash) = target_project_hash {
153 let session_hash = session
154 .project_root
155 .as_ref()
156 .map(|root| project_hash_from_root(&root.to_string_lossy()));
157 if session_hash.as_ref() != Some(target_hash) {
158 continue;
159 }
160 }
161
162 if let Some(ref mod_time) = session.latest_mod_time
163 && (most_recent.is_none()
164 || Some(mod_time) > most_recent.as_ref().map(|(_, t)| t))
165 {
166 most_recent = Some((provider_name.clone(), mod_time.clone()));
167 }
168 }
169 }
170
171 most_recent.map(|(provider, _)| provider)
172 }
173}