agentsight 0.2.1

eBPF-based observability for AI agent sessions, prompts, process trees, files, network activity, and token usage.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
use super::*;
use crate::framework::runners::{EventStream, FakeRunner, Runner};
use futures::stream::StreamExt;
use serde_json::json;
use std::sync::{
    Arc,
    atomic::{AtomicUsize, Ordering},
};
use std::time::Instant;
use tempfile::NamedTempFile;
use tokio::time::Duration;

/// Custom test analyzer that simulates errors
struct ErrorSimulatorAnalyzer {
    error_on_event_number: usize,
}

impl ErrorSimulatorAnalyzer {
    fn new(error_on_event_number: usize) -> Self {
        Self {
            error_on_event_number,
        }
    }
}

#[async_trait]
impl Analyzer for ErrorSimulatorAnalyzer {
    async fn process(&mut self, stream: EventStream) -> Result<EventStream, AnalyzerError> {
        let error_event = self.error_on_event_number;
        let counter = Arc::new(AtomicUsize::new(0));

        let processed_stream = stream.map(move |event| {
            let count = counter.fetch_add(1, Ordering::SeqCst) + 1;
            if count == error_event {
                // Simulate an error condition but don't actually error out
                // Instead, modify the event to indicate an error occurred
                let mut error_event = event;
                if let Some(data) = error_event.data.as_object_mut() {
                    data.insert("analyzer_error".to_string(), json!("Simulated error"));
                }
                error_event
            } else {
                event
            }
        });

        Ok(Box::pin(processed_stream))
    }
}

/// Custom test analyzer that filters events
struct FilterAnalyzer {
    filter_condition: String,
}

impl FilterAnalyzer {
    fn new(filter_condition: String) -> Self {
        Self { filter_condition }
    }
}

#[async_trait]
impl Analyzer for FilterAnalyzer {
    async fn process(&mut self, stream: EventStream) -> Result<EventStream, AnalyzerError> {
        let condition = self.filter_condition.clone();
        let filtered_stream = stream.filter(move |event| {
            let matches = if condition == "ssl_only" {
                event.source == "ssl"
            } else if condition == "even_pids" {
                event
                    .data
                    .get("pid")
                    .and_then(|v| v.as_u64())
                    .map(|pid| pid % 2 == 0)
                    .unwrap_or(false)
            } else {
                true // No filter
            };
            futures::future::ready(matches)
        });

        Ok(Box::pin(filtered_stream))
    }
}

/// Custom test analyzer that adds metadata
struct MetadataEnricherAnalyzer {
    metadata: serde_json::Value,
}

impl MetadataEnricherAnalyzer {
    fn new(metadata: serde_json::Value) -> Self {
        Self { metadata }
    }
}

#[async_trait]
impl Analyzer for MetadataEnricherAnalyzer {
    async fn process(&mut self, stream: EventStream) -> Result<EventStream, AnalyzerError> {
        let metadata = self.metadata.clone();
        let enriched_stream = stream.map(move |mut event| {
            if let Some(data) = event.data.as_object_mut() {
                data.insert("enriched_metadata".to_string(), metadata.clone());
            }
            event
        });

        Ok(Box::pin(enriched_stream))
    }
}

#[tokio::test]
async fn test_complex_analyzer_chain_composition() {
    let temp_file = NamedTempFile::new().unwrap();

    // Create a complex chain: Filter -> ChunkMerger -> Enrich -> FileLogger
    let mut runner = FakeRunner::new()
        .event_count(5) // 10 events total
        .delay_ms(10)
        .add_analyzer(Box::new(FilterAnalyzer::new("ssl_only".to_string())))
        .add_analyzer(Box::new(SSEProcessor::new_with_timeout(5000)))
        .add_analyzer(Box::new(MetadataEnricherAnalyzer::new(
            json!({"test_run": "complex_chain", "version": "1.0"}),
        )))
        .add_analyzer(Box::new(FileLogger::new(temp_file.path()).unwrap()));

    let stream = runner.run().await.unwrap();
    let events: Vec<_> = stream.collect().await;

    // Verify events passed through all analyzers
    assert!(!events.is_empty(), "Should have events");

    // All remaining events should be SSL (due to filter)
    let non_ssl_events = events
        .iter()
        .filter(|e| e.source != "ssl" && e.source != "sse_processor")
        .count();
    assert_eq!(non_ssl_events, 0, "Filter should remove non-SSL events");

    // Events should have enriched metadata
    let enriched_events = events
        .iter()
        .filter(|e| e.data.get("enriched_metadata").is_some())
        .count();
    assert!(enriched_events > 0, "Should have enriched events");

    // Verify sse processor events were created
    let _sse_events = events
        .iter()
        .filter(|e| e.source == "sse_processor")
        .count();
    // Note: sse_events might be 0 if no SSE data was processed

    // Verify file was written
    let file_size = std::fs::metadata(temp_file.path()).unwrap().len();
    assert!(file_size > 0, "Log file should have content");
}

#[tokio::test]
async fn test_analyzer_chain_error_resilience() {
    // Test that analyzer chain continues working even when individual analyzers encounter issues
    let mut runner = FakeRunner::new()
        .event_count(5)
        .delay_ms(10)
        .add_analyzer(Box::new(ErrorSimulatorAnalyzer::new(3))) // Error on 3rd event
        .add_analyzer(Box::new(SSEProcessor::new_with_timeout(5000)));

    let stream = runner.run().await.unwrap();
    let events: Vec<_> = stream.collect().await;

    // Should still process all events
    assert!(
        events.len() >= 10,
        "Should process all events despite simulated error"
    );

    // Check that error was marked on the 3rd event
    let error_events = events
        .iter()
        .filter(|e| e.data.get("analyzer_error").is_some())
        .count();
    assert!(
        error_events > 0,
        "Should have error markers from ErrorSimulator"
    );
}

#[tokio::test]
async fn test_analyzer_chain_concurrent_processing() {
    use std::sync::Arc;
    use tokio::sync::Mutex;

    // Test multiple analyzer chains running concurrently
    let results = Arc::new(Mutex::new(Vec::new()));

    let mut handles = Vec::new();

    for i in 0..3 {
        let results_clone = Arc::clone(&results);
        let handle = tokio::spawn(async move {
            let mut runner = FakeRunner::new()
                .event_count(3)
                .delay_ms(5)
                .add_analyzer(Box::new(SSEProcessor::new_with_timeout(5000)));

            let stream = runner.run().await.unwrap();
            let events: Vec<_> = stream.collect().await;

            let mut results_guard = results_clone.lock().await;
            results_guard.push((i, events.len()));

            events.len()
        });
        handles.push(handle);
    }

    // Wait for all chains to complete
    let mut total_events = 0;
    for handle in handles {
        total_events += handle.await.unwrap();
    }

    let results_guard = results.lock().await;

    // All chains should have processed events
    assert_eq!(results_guard.len(), 3, "Should have 3 chain results");
    assert!(
        total_events >= 18,
        "Should have at least 18 events total (3 chains × 6 events)"
    );

    for (chain_id, event_count) in results_guard.iter() {
        assert!(
            *event_count >= 6,
            "Chain {} should have at least 6 events",
            chain_id
        );
    }
}

#[tokio::test]
async fn test_analyzer_chain_streaming_behavior() {
    // Test that events are processed in streaming fashion, not batched
    use std::sync::Arc;
    use std::time::Instant;
    use tokio::sync::Mutex;

    let event_timestamps = Arc::new(Mutex::new(Vec::new()));

    // Custom analyzer that records processing timestamps
    struct TimestampRecorderAnalyzer {
        timestamps: Arc<Mutex<Vec<(usize, Instant)>>>,
        counter: Arc<AtomicUsize>,
    }

    impl TimestampRecorderAnalyzer {
        fn new(timestamps: Arc<Mutex<Vec<(usize, Instant)>>>) -> Self {
            Self {
                timestamps,
                counter: Arc::new(AtomicUsize::new(0)),
            }
        }
    }

    #[async_trait]
    impl Analyzer for TimestampRecorderAnalyzer {
        async fn process(&mut self, stream: EventStream) -> Result<EventStream, AnalyzerError> {
            let timestamps = self.timestamps.clone();
            let counter = self.counter.clone();

            let recorded_stream = stream.map(move |event| {
                let count = counter.fetch_add(1, Ordering::SeqCst);
                let timestamp = Instant::now();

                let timestamps_clone = timestamps.clone();
                tokio::spawn(async move {
                    let mut guard = timestamps_clone.lock().await;
                    guard.push((count, timestamp));
                });

                event
            });

            Ok(Box::pin(recorded_stream))
        }
    }

    let timestamps_clone = Arc::clone(&event_timestamps);

    let mut runner = FakeRunner::new()
        .event_count(5) // 10 events total
        .delay_ms(100) // 100ms delay to ensure streaming behavior is observable
        .add_analyzer(Box::new(TimestampRecorderAnalyzer::new(timestamps_clone)));

    let stream = runner.run().await.unwrap();
    let _: Vec<_> = stream.collect().await;

    // Wait a bit for async timestamp recording to complete
    tokio::time::sleep(Duration::from_millis(50)).await;

    let timestamps_guard = event_timestamps.lock().await;

    // Verify streaming behavior - events should arrive over time, not all at once
    assert!(
        timestamps_guard.len() >= 5,
        "Should have recorded multiple timestamps"
    );

    if timestamps_guard.len() >= 2 {
        let first_event_time = timestamps_guard[0].1;
        let last_event_time = timestamps_guard[timestamps_guard.len() - 1].1;
        let processing_span = last_event_time.duration_since(first_event_time);

        // Should take some time due to delays, indicating streaming behavior
        assert!(
            processing_span >= Duration::from_millis(50),
            "Events should be processed over time, not all at once"
        );
    }
}

#[tokio::test]
async fn test_analyzer_chain_backpressure_handling() {
    // Test analyzer chain behavior under backpressure conditions

    // Custom slow analyzer that simulates processing delays
    struct SlowAnalyzer {
        delay_ms: u64,
    }

    impl SlowAnalyzer {
        fn new(delay_ms: u64) -> Self {
            Self { delay_ms }
        }
    }

    #[async_trait]
    impl Analyzer for SlowAnalyzer {
        async fn process(&mut self, stream: EventStream) -> Result<EventStream, AnalyzerError> {
            let delay = self.delay_ms;
            let slow_stream = stream.then(move |event| async move {
                tokio::time::sleep(Duration::from_millis(delay)).await;
                event
            });

            Ok(Box::pin(slow_stream))
        }
    }

    let start_time = Instant::now();

    let mut runner = FakeRunner::new()
        .event_count(3) // 6 events total
        .delay_ms(10) // Fast generation
        .add_analyzer(Box::new(SlowAnalyzer::new(50))); // Slow processing

    let stream = runner.run().await.unwrap();
    let events: Vec<_> = stream.collect().await;
    let total_time = start_time.elapsed();

    // Should process all events
    assert_eq!(
        events.len(),
        6,
        "Should process all events despite slow analyzer"
    );

    // Should take longer due to slow analyzer (at least 3 * 50ms = 150ms for processing)
    assert!(
        total_time >= Duration::from_millis(100),
        "Should take time due to slow analyzer processing"
    );
}

#[tokio::test]
async fn test_analyzer_chain_resource_cleanup() {
    // Test that resources are properly cleaned up after analyzer chain completion
    use std::sync::Arc;
    use tokio::sync::Mutex;

    // Custom analyzer that tracks resource allocation/cleanup
    struct ResourceTrackingAnalyzer {
        resources: Arc<Mutex<Vec<String>>>,
        id: String,
    }

    impl ResourceTrackingAnalyzer {
        fn new(id: String, resources: Arc<Mutex<Vec<String>>>) -> Self {
            Self { resources, id }
        }
    }

    impl Drop for ResourceTrackingAnalyzer {
        fn drop(&mut self) {}
    }

    #[async_trait]
    impl Analyzer for ResourceTrackingAnalyzer {
        async fn process(&mut self, stream: EventStream) -> Result<EventStream, AnalyzerError> {
            let resources = self.resources.clone();
            let id = self.id.clone();

            // Simulate resource allocation
            {
                let mut guard = resources.lock().await;
                guard.push(format!("resource_{}", id));
            }

            let processed_stream = stream.map(move |event| {
                // Simulate resource usage
                event
            });

            Ok(Box::pin(processed_stream))
        }
    }

    let resources = Arc::new(Mutex::new(Vec::new()));

    {
        let mut runner = FakeRunner::new()
            .event_count(2)
            .delay_ms(10)
            .add_analyzer(Box::new(ResourceTrackingAnalyzer::new(
                "test1".to_string(),
                Arc::clone(&resources),
            )))
            .add_analyzer(Box::new(ResourceTrackingAnalyzer::new(
                "test2".to_string(),
                Arc::clone(&resources),
            )));

        let stream = runner.run().await.unwrap();
        let events: Vec<_> = stream.collect().await;

        assert_eq!(events.len(), 4, "Should process all events");
    } // Runner and analyzers go out of scope here

    // Verify resources were allocated
    let resources_guard = resources.lock().await;
    assert_eq!(
        resources_guard.len(),
        2,
        "Should have allocated 2 resources"
    );
    assert!(resources_guard.contains(&"resource_test1".to_string()));
    assert!(resources_guard.contains(&"resource_test2".to_string()));
}