arcbox-agent 0.0.1-alpha.1

Guest agent for ArcBox VMs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
//! Log file watcher for streaming container logs.
//!
//! Uses inotify (Linux) or kqueue (macOS) to watch log files for changes
//! and yields new log lines as they are appended.

use std::io::{BufRead, BufReader, Seek, SeekFrom};
use std::path::Path;

use anyhow::{Context, Result};
use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher};
use tokio::sync::mpsc;

use arcbox_protocol::agent::LogEntry;

/// Default buffer size for reading log lines.
const READ_BUFFER_SIZE: usize = 8192;

/// Options for log watching.
#[derive(Debug, Clone)]
pub struct LogWatchOptions {
    /// Include stdout stream.
    pub stdout: bool,
    /// Include stderr stream.
    pub stderr: bool,
    /// Include timestamps in output.
    pub timestamps: bool,
    /// Number of lines to tail before streaming (0 = all).
    pub tail: i64,
    /// Filter logs since this Unix timestamp (0 = no filter).
    pub since: i64,
    /// Filter logs until this Unix timestamp (0 = no filter).
    pub until: i64,
}

impl Default for LogWatchOptions {
    fn default() -> Self {
        Self {
            stdout: true,
            stderr: true,
            timestamps: false,
            tail: 0,
            since: 0,
            until: 0,
        }
    }
}

/// Watches a log file and streams new entries.
///
/// This function:
/// 1. Reads existing log content (applying tail filter if specified)
/// 2. Sets up a file watcher for new changes
/// 3. Streams new log entries as they are appended
///
/// The returned receiver will yield LogEntry messages until the sender is dropped.
pub async fn watch_log_file(
    log_path: impl AsRef<Path>,
    options: LogWatchOptions,
    cancel: mpsc::Receiver<()>,
) -> Result<mpsc::Receiver<LogEntry>> {
    let log_path = log_path.as_ref().to_path_buf();
    let (tx, rx) = mpsc::channel::<LogEntry>(64);

    // Spawn the watcher task
    tokio::spawn(async move {
        if let Err(e) = run_watcher(log_path, options, tx, cancel).await {
            tracing::error!("Log watcher error: {}", e);
        }
    });

    Ok(rx)
}

/// Internal watcher implementation.
async fn run_watcher(
    log_path: std::path::PathBuf,
    options: LogWatchOptions,
    tx: mpsc::Sender<LogEntry>,
    mut cancel: mpsc::Receiver<()>,
) -> Result<()> {
    // Open file or wait for it to be created
    let mut file = loop {
        match std::fs::File::open(&log_path) {
            Ok(f) => break f,
            Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
                // Wait a bit and retry, or check for cancellation
                tokio::select! {
                    _ = cancel.recv() => {
                        tracing::debug!("Log watcher cancelled while waiting for file");
                        return Ok(());
                    }
                    _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
                        continue;
                    }
                }
            }
            Err(e) => return Err(e).context("failed to open log file"),
        }
    };

    // Read initial content with tail filter
    let initial_lines = read_with_tail(&mut file, options.tail)?;

    // Send initial lines
    for line in initial_lines {
        let entry = parse_log_line(&line, &options);
        if should_include_entry(&entry, &options) {
            if tx.send(entry).await.is_err() {
                // Receiver dropped
                return Ok(());
            }
        }
    }

    // Get current file position
    let mut pos = file.seek(SeekFrom::End(0))?;

    // Set up file watcher
    let (notify_tx, mut notify_rx) = mpsc::channel::<notify::Result<notify::Event>>(16);

    let mut watcher = RecommendedWatcher::new(
        move |res| {
            let _ = notify_tx.blocking_send(res);
        },
        Config::default(),
    )
    .context("failed to create file watcher")?;

    watcher
        .watch(&log_path, RecursiveMode::NonRecursive)
        .context("failed to watch log file")?;

    tracing::debug!("Started watching log file: {:?}", log_path);

    // Watch for changes
    loop {
        tokio::select! {
            _ = cancel.recv() => {
                tracing::debug!("Log watcher cancelled");
                break;
            }
            Some(event) = notify_rx.recv() => {
                match event {
                    Ok(event) => {
                        if event.kind.is_modify() || event.kind.is_create() {
                            // Read new content
                            if let Ok(new_lines) = read_new_content(&mut file, &mut pos) {
                                for line in new_lines {
                                    let entry = parse_log_line(&line, &options);
                                    if should_include_entry(&entry, &options) {
                                        if tx.send(entry).await.is_err() {
                                            // Receiver dropped
                                            return Ok(());
                                        }
                                    }
                                }
                            }
                        }
                    }
                    Err(e) => {
                        tracing::warn!("File watcher error: {}", e);
                    }
                }
            }
        }
    }

    Ok(())
}

/// Reads the file with tail filter applied.
fn read_with_tail(file: &mut std::fs::File, tail: i64) -> Result<Vec<String>> {
    let reader = BufReader::with_capacity(READ_BUFFER_SIZE, &*file);
    let all_lines: Vec<String> = reader.lines().collect::<std::io::Result<Vec<_>>>()?;

    if tail <= 0 || tail as usize >= all_lines.len() {
        Ok(all_lines)
    } else {
        let start = all_lines.len() - tail as usize;
        Ok(all_lines[start..].to_vec())
    }
}

/// Reads new content from the file since the last position.
fn read_new_content(file: &mut std::fs::File, pos: &mut u64) -> Result<Vec<String>> {
    // Seek to last known position
    file.seek(SeekFrom::Start(*pos))?;

    let mut reader = BufReader::new(&*file);
    let mut lines = Vec::new();
    let mut line = String::new();

    loop {
        line.clear();
        match reader.read_line(&mut line) {
            Ok(0) => break, // EOF
            Ok(_) => {
                // Remove trailing newline
                let trimmed = line.trim_end_matches(&['\n', '\r'][..]).to_string();
                if !trimmed.is_empty() {
                    lines.push(trimmed);
                }
            }
            Err(e) => {
                tracing::warn!("Error reading log line: {}", e);
                break;
            }
        }
    }

    // Update position
    *pos = file.seek(SeekFrom::Current(0))?;

    Ok(lines)
}

/// Parses a log line into a LogEntry.
///
/// Log format can be:
/// - Plain text: "message"
/// - With stream prefix: "stdout: message" or "stderr: message"
/// - With timestamp: "2024-01-01T00:00:00Z stdout: message"
fn parse_log_line(line: &str, options: &LogWatchOptions) -> LogEntry {
    let now = chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0);

    // Try to parse structured log format
    // Format: [timestamp] [stream]: message
    let (timestamp, stream, data) = parse_structured_line(line);

    let final_timestamp = timestamp.unwrap_or(now);
    let final_stream = stream.unwrap_or_else(|| "stdout".to_string());

    // Build output data
    let output_data = if options.timestamps {
        let ts = chrono::DateTime::from_timestamp_nanos(final_timestamp);
        format!("{} {}", ts.format("%Y-%m-%dT%H:%M:%S%.9fZ"), data)
    } else {
        data
    };

    LogEntry {
        stream: final_stream,
        data: output_data.into_bytes(),
        timestamp: final_timestamp,
    }
}

/// Parses a structured log line.
///
/// Returns (timestamp, stream, message).
fn parse_structured_line(line: &str) -> (Option<i64>, Option<String>, String) {
    // Try to parse timestamp prefix
    if let Some((ts_str, rest)) = line.split_once(' ') {
        if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(ts_str) {
            let timestamp = dt.timestamp_nanos_opt();

            // Check for stream prefix in rest
            if let Some((stream, msg)) = rest.split_once(": ") {
                if stream == "stdout" || stream == "stderr" {
                    return (timestamp, Some(stream.to_string()), msg.to_string());
                }
            }
            return (timestamp, None, rest.to_string());
        }
    }

    // Check for stream prefix without timestamp
    if let Some((stream, msg)) = line.split_once(": ") {
        if stream == "stdout" || stream == "stderr" {
            return (None, Some(stream.to_string()), msg.to_string());
        }
    }

    // Plain text
    (None, None, line.to_string())
}

/// Checks if an entry should be included based on options.
fn should_include_entry(entry: &LogEntry, options: &LogWatchOptions) -> bool {
    // Stream filter
    if entry.stream == "stdout" && !options.stdout {
        return false;
    }
    if entry.stream == "stderr" && !options.stderr {
        return false;
    }

    // Time filters
    let timestamp_secs = entry.timestamp / 1_000_000_000;
    if options.since > 0 && timestamp_secs < options.since {
        return false;
    }
    if options.until > 0 && timestamp_secs > options.until {
        return false;
    }

    true
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_parse_plain_line() {
        let options = LogWatchOptions::default();
        let entry = parse_log_line("hello world", &options);

        assert_eq!(entry.stream, "stdout");
        assert_eq!(String::from_utf8_lossy(&entry.data), "hello world");
    }

    #[test]
    fn test_parse_line_with_stream_prefix() {
        let options = LogWatchOptions::default();

        let entry = parse_log_line("stderr: error message", &options);
        assert_eq!(entry.stream, "stderr");
        assert_eq!(String::from_utf8_lossy(&entry.data), "error message");

        let entry = parse_log_line("stdout: info message", &options);
        assert_eq!(entry.stream, "stdout");
        assert_eq!(String::from_utf8_lossy(&entry.data), "info message");
    }

    #[test]
    fn test_parse_line_with_timestamp() {
        let options = LogWatchOptions::default();
        let entry = parse_log_line("2024-01-15T10:30:00Z stdout: test message", &options);

        assert_eq!(entry.stream, "stdout");
        assert_eq!(String::from_utf8_lossy(&entry.data), "test message");
        // Timestamp should be parsed
        assert!(entry.timestamp > 0);
    }

    #[test]
    fn test_should_include_entry_stdout_only() {
        let options = LogWatchOptions {
            stdout: true,
            stderr: false,
            ..Default::default()
        };

        let stdout_entry = LogEntry {
            stream: "stdout".to_string(),
            data: vec![],
            timestamp: 0,
        };
        let stderr_entry = LogEntry {
            stream: "stderr".to_string(),
            data: vec![],
            timestamp: 0,
        };

        assert!(should_include_entry(&stdout_entry, &options));
        assert!(!should_include_entry(&stderr_entry, &options));
    }

    #[test]
    fn test_should_include_entry_time_filter() {
        let options = LogWatchOptions {
            since: 1000,
            until: 2000,
            ..Default::default()
        };

        let before = LogEntry {
            stream: "stdout".to_string(),
            data: vec![],
            timestamp: 500_000_000_000, // 500 seconds in nanos
        };
        let during = LogEntry {
            stream: "stdout".to_string(),
            data: vec![],
            timestamp: 1500_000_000_000, // 1500 seconds in nanos
        };
        let after = LogEntry {
            stream: "stdout".to_string(),
            data: vec![],
            timestamp: 2500_000_000_000, // 2500 seconds in nanos
        };

        assert!(!should_include_entry(&before, &options));
        assert!(should_include_entry(&during, &options));
        assert!(!should_include_entry(&after, &options));
    }

    #[test]
    fn test_parse_structured_line() {
        // Plain text
        let (ts, stream, msg) = parse_structured_line("hello world");
        assert!(ts.is_none());
        assert!(stream.is_none());
        assert_eq!(msg, "hello world");

        // With stream prefix
        let (ts, stream, msg) = parse_structured_line("stderr: error");
        assert!(ts.is_none());
        assert_eq!(stream, Some("stderr".to_string()));
        assert_eq!(msg, "error");

        // With timestamp and stream
        let (ts, stream, msg) =
            parse_structured_line("2024-01-15T10:30:00+00:00 stdout: message");
        assert!(ts.is_some());
        assert_eq!(stream, Some("stdout".to_string()));
        assert_eq!(msg, "message");
    }
}