agtrace_runtime/client/
watch_service.rs1use crate::client::{MonitorBuilder, StreamHandle};
2use crate::config::Config;
3use crate::runtime::SessionStreamer;
4use agtrace_index::Database;
5use agtrace_types::project_hash_from_root;
6use anyhow::Result;
7use std::path::{Path, PathBuf};
8use std::sync::{Arc, Mutex};
9
10#[derive(Clone)]
11pub struct WatchService {
12 db: Arc<Mutex<Database>>,
13 config: Arc<Config>,
14 provider_configs: Arc<Vec<(String, PathBuf)>>,
15}
16
17impl WatchService {
18 pub fn new(
19 db: Arc<Mutex<Database>>,
20 config: Arc<Config>,
21 provider_configs: Arc<Vec<(String, PathBuf)>>,
22 ) -> Self {
23 Self {
24 db,
25 config,
26 provider_configs,
27 }
28 }
29
30 pub fn watch_session(&self, session_id: &str) -> Result<StreamHandle> {
31 let resolved_id = {
34 let db = self.db.lock().unwrap();
35 if let Some(session) = db.get_session_by_id(session_id)? {
36 session.id
37 } else if let Some(full_id) = db.find_session_by_prefix(session_id)? {
38 full_id
39 } else {
40 session_id.to_string()
42 }
43 };
44
45 let session_opt = {
46 let db = self.db.lock().unwrap();
47 db.get_session_by_id(&resolved_id)?
48 };
49
50 let streamer = if let Some(session) = session_opt {
51 let adapter = agtrace_providers::create_adapter(&session.provider)?;
53 SessionStreamer::attach(resolved_id.clone(), self.db.clone(), Arc::new(adapter))?
54 } else {
55 let mut last_error = None;
58
59 for (provider_name, log_root) in self.provider_configs.iter() {
60 match agtrace_providers::create_adapter(provider_name) {
61 Ok(adapter) => {
62 match SessionStreamer::attach_from_filesystem(
63 resolved_id.clone(),
64 log_root.clone(),
65 Arc::new(adapter),
66 ) {
67 Ok(streamer) => return Ok(StreamHandle::new(streamer)),
68 Err(e) => last_error = Some(e),
69 }
70 }
71 Err(e) => last_error = Some(e),
72 }
73 }
74
75 return Err(last_error
76 .unwrap_or_else(|| anyhow::anyhow!("No providers configured"))
77 .context(format!("Session not found: {}", resolved_id)));
78 };
79
80 Ok(StreamHandle::new(streamer))
81 }
82
83 pub fn watch_provider(&self, provider_name: &str) -> Result<MonitorBuilder> {
84 let log_root = self
85 .provider_configs
86 .iter()
87 .find(|(name, _)| name == provider_name)
88 .map(|(_, path)| path.clone())
89 .ok_or_else(|| anyhow::anyhow!("Provider '{}' not configured", provider_name))?;
90
91 Ok(MonitorBuilder::new(
92 self.db.clone(),
93 Arc::new(vec![(provider_name.to_string(), log_root)]),
94 ))
95 }
96
97 pub fn watch_all_providers(&self) -> Result<MonitorBuilder> {
98 Ok(MonitorBuilder::new(
99 self.db.clone(),
100 self.provider_configs.clone(),
101 ))
102 }
103
104 pub fn config(&self) -> &Config {
105 &self.config
106 }
107
108 pub fn database(&self) -> Arc<Mutex<Database>> {
109 self.db.clone()
110 }
111
112 pub fn find_most_recent_provider(&self, project_root: Option<&Path>) -> Option<String> {
127 let target_project_hash =
129 project_root.map(|root| project_hash_from_root(&root.display().to_string()));
130
131 let mut most_recent: Option<(String, String)> = None; for (provider_name, log_root) in self.provider_configs.iter() {
135 let adapter = match agtrace_providers::create_adapter(provider_name) {
137 Ok(a) => a,
138 Err(_) => continue,
139 };
140
141 let sessions = match adapter.discovery.scan_sessions(log_root) {
143 Ok(s) => s,
144 Err(_) => continue,
145 };
146
147 for session in sessions {
149 if let Some(ref target_hash) = target_project_hash {
151 let session_hash = session
152 .project_root
153 .as_ref()
154 .map(|root| project_hash_from_root(&root.to_string_lossy()));
155 if session_hash.as_ref() != Some(target_hash) {
156 continue;
157 }
158 }
159
160 if let Some(ref mod_time) = session.latest_mod_time
161 && (most_recent.is_none()
162 || Some(mod_time) > most_recent.as_ref().map(|(_, t)| t))
163 {
164 most_recent = Some((provider_name.clone(), mod_time.clone()));
165 }
166 }
167 }
168
169 most_recent.map(|(provider, _)| provider)
170 }
171}