1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
use crate::logs::LogStreamer;
use crate::models::{LogEntry, LogSource, Service};
use anyhow::Result;
use tokio::sync::mpsc;
use tracing::debug;
/// Registry for log streamers
pub struct LogStreamerRegistry {
streamers: Vec<(String, Box<dyn LogStreamer>)>,
}
impl LogStreamerRegistry {
pub fn new() -> Self {
Self {
streamers: Vec::new(),
}
}
/// Register a log streamer with a unique identifier
/// Streamers are tried in registration order (first match wins)
pub fn register(&mut self, id: String, streamer: Box<dyn LogStreamer>) {
debug!("Registering log streamer: {}", id);
self.streamers.push((id, streamer));
}
/// Start streaming logs for a service using registered streamers
pub async fn start_streaming(
&self,
service: &Service,
) -> Result<mpsc::UnboundedReceiver<LogEntry>> {
let (tx, rx) = mpsc::unbounded_channel();
// Try streamers in registration order
for (id, streamer) in &self.streamers {
if streamer.can_handle(service) {
debug!("Using log streamer '{}' for service '{}'", id, service.name);
let service_clone = service.clone();
let tx_clone = tx.clone();
// Clone the streamer's source type to determine how to spawn
let source_type = streamer.source_type();
// Spawn the streaming task based on source type
// Note: We need to create new instances for each spawn since we can't clone trait objects
match source_type {
LogSource::Docker => {
let docker_streamer = crate::logs::docker::DockerLogStreamer::new();
tokio::spawn(async move {
if let Err(e) = docker_streamer.stream_logs(&service_clone, tx_clone).await {
tracing::error!("Docker log streaming error for {}: {}", service_clone.name, e);
}
});
}
LogSource::Systemd => {
let systemd_streamer = crate::logs::systemd::SystemdLogStreamer::new();
tokio::spawn(async move {
if let Err(e) = systemd_streamer.stream_logs(&service_clone, tx_clone).await {
tracing::error!("Systemd log streaming error for {}: {}", service_clone.name, e);
}
});
}
LogSource::File => {
let file_streamer = crate::logs::file::FileLogStreamer::new();
tokio::spawn(async move {
if let Err(e) = file_streamer.stream_logs(&service_clone, tx_clone).await {
tracing::error!("File log streaming error for {}: {}", service_clone.name, e);
}
});
}
LogSource::Process => {
let process_streamer = crate::logs::process::ProcessLogStreamer::new();
tokio::spawn(async move {
if let Err(e) = process_streamer.stream_logs(&service_clone, tx_clone).await {
tracing::error!("Process log streaming error for {}: {}", service_clone.name, e);
}
});
}
LogSource::Custom => {
// For custom streamers, we need to call the trait method directly
// This requires cloning the service and channel
let streamer_id = id.clone();
tokio::spawn(async move {
// We can't easily call stream_logs here without cloning the streamer
// This is a limitation - custom streamers should handle their own spawning
tracing::warn!("Custom log streamer '{}' needs to handle spawning internally", streamer_id);
});
}
}
return Ok(rx);
}
}
// No streamer found
anyhow::bail!("No log streamer available for service: {}", service.name)
}
/// Get list of all registered streamer IDs
pub fn registered_ids(&self) -> Vec<String> {
self.streamers.iter().map(|(id, _)| id.clone()).collect()
}
}
impl Default for LogStreamerRegistry {
fn default() -> Self {
Self::new()
}
}