agtrace_runtime/client/
watch_service.rs1use crate::client::{MonitorBuilder, StreamHandle};
2use crate::config::Config;
3use crate::runtime::SessionStreamer;
4use agtrace_index::Database;
5use anyhow::Result;
6use std::path::PathBuf;
7use std::sync::{Arc, Mutex};
8
9#[derive(Clone)]
10pub struct WatchService {
11 db: Arc<Mutex<Database>>,
12 config: Arc<Config>,
13 provider_configs: Arc<Vec<(String, PathBuf)>>,
14}
15
16impl WatchService {
17 pub fn new(
18 db: Arc<Mutex<Database>>,
19 config: Arc<Config>,
20 provider_configs: Arc<Vec<(String, PathBuf)>>,
21 ) -> Self {
22 Self {
23 db,
24 config,
25 provider_configs,
26 }
27 }
28
29 pub fn watch_session(&self, session_id: &str) -> Result<StreamHandle> {
30 let resolved_id = {
33 let db = self.db.lock().unwrap();
34 if let Some(session) = db.get_session_by_id(session_id)? {
35 session.id
36 } else if let Some(full_id) = db.find_session_by_prefix(session_id)? {
37 full_id
38 } else {
39 session_id.to_string()
41 }
42 };
43
44 let session_opt = {
45 let db = self.db.lock().unwrap();
46 db.get_session_by_id(&resolved_id)?
47 };
48
49 let streamer = if let Some(session) = session_opt {
50 let adapter = agtrace_providers::create_adapter(&session.provider)?;
52 SessionStreamer::attach(resolved_id.clone(), self.db.clone(), Arc::new(adapter))?
53 } else {
54 let mut last_error = None;
57
58 for (provider_name, log_root) in self.provider_configs.iter() {
59 match agtrace_providers::create_adapter(provider_name) {
60 Ok(adapter) => {
61 match SessionStreamer::attach_from_filesystem(
62 resolved_id.clone(),
63 log_root.clone(),
64 Arc::new(adapter),
65 ) {
66 Ok(streamer) => return Ok(StreamHandle::new(streamer)),
67 Err(e) => last_error = Some(e),
68 }
69 }
70 Err(e) => last_error = Some(e),
71 }
72 }
73
74 return Err(last_error
75 .unwrap_or_else(|| anyhow::anyhow!("No providers configured"))
76 .context(format!("Session not found: {}", resolved_id)));
77 };
78
79 Ok(StreamHandle::new(streamer))
80 }
81
82 pub fn watch_provider(&self, provider_name: &str) -> Result<MonitorBuilder> {
83 let log_root = self
84 .provider_configs
85 .iter()
86 .find(|(name, _)| name == provider_name)
87 .map(|(_, path)| path.clone())
88 .ok_or_else(|| anyhow::anyhow!("Provider '{}' not configured", provider_name))?;
89
90 Ok(MonitorBuilder::new(
91 self.db.clone(),
92 Arc::new(vec![(provider_name.to_string(), log_root)]),
93 ))
94 }
95
96 pub fn config(&self) -> &Config {
97 &self.config
98 }
99
100 pub fn database(&self) -> Arc<Mutex<Database>> {
101 self.db.clone()
102 }
103}