bamboo-engine 2026.4.30

Execution engine and orchestration for the Bamboo agent framework
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
//! Metrics Event System
//!
//! This module defines the event types used for tracking agent performance and behavior.
//! The event system follows a unified architecture that supports both internal agent operations
//! and HTTP proxy forwarding.
//!
//! # Event Categories
//!
//! The system organizes metrics into three distinct categories:
//!
//! ## Chat Events (`ChatEvent`)
//! Events emitted by the agent's internal chat loop during conversation sessions.
//! These track the lifecycle of sessions, rounds, and tool invocations.
//!
//! ## Forward Events (`ForwardEvent`)
//! Events emitted when forwarding requests to upstream API providers (e.g., OpenAI, Anthropic).
//! These track HTTP proxy operations for external API calls.
//!
//! ## System Events (`SystemEvent`)
//! Operational events for monitoring the metrics system itself, including error tracking
//! and worker lifecycle management.
//!
//! # Architecture
//!
//! All events share common metadata (`EventMeta`) that includes:
//! - Unique event identifier (UUID v4)
//! - Precise timestamp of occurrence
//! - Optional trace ID for distributed request tracing
//!
//! Events are designed to be:
//! - **Serializable**: All events implement `Serialize` and `Deserialize` for persistence
//! - **Immutable**: Once created, events should not be modified
//! - **Traceable**: Events can be linked via trace IDs across system boundaries

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;

use crate::metrics::types::{ForwardStatus, RoundStatus, SessionStatus, TokenUsage};

/// Metadata attached to every metrics event.
///
/// This structure provides common identification and timing information
/// that is shared across all event types in the metrics system.
/// Metadata attached to every metrics event.
///
/// This structure provides common identification and timing information
/// that is shared across all event types in the metrics system.
///
/// # Fields
///
/// - `event_id`: Unique identifier for this event (UUID v4 format)
/// - `occurred_at`: UTC timestamp when the event occurred
/// - `trace_id`: Optional identifier for correlating related events across services
///
/// # Example
///
/// ```rust,ignore
///
/// // Create event metadata without trace ID
/// let meta = EventMeta::new();
/// assert!(!meta.event_id.is_empty());
/// assert!(meta.trace_id.is_none());
///
/// // Create event metadata with trace ID for distributed tracing
/// let traced_meta = EventMeta::with_trace_id("req-abc-123");
/// assert_eq!(traced_meta.trace_id, Some("req-abc-123".to_string()));
/// ```
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EventMeta {
    /// Unique event ID (UUID v4)
    pub event_id: String,
    /// When the event occurred
    pub occurred_at: DateTime<Utc>,
    /// Optional trace ID for correlating request chains
    pub trace_id: Option<String>,
}

impl EventMeta {
    /// Creates new event metadata with a fresh UUID and current timestamp.
    ///
    /// The trace ID is set to `None`. Use this constructor when you don't need
    /// to correlate this event with other events across service boundaries.
    ///
    /// # Example
    ///
    /// ```rust,ignore
    /// use bamboo_agent::agent::metrics::events::EventMeta;
    ///
    /// let meta = EventMeta::new();
    /// assert!(!meta.event_id.is_empty());
    /// assert!(meta.trace_id.is_none());
    /// ```
    pub fn new() -> Self {
        Self {
            event_id: Uuid::new_v4().to_string(),
            occurred_at: Utc::now(),
            trace_id: None,
        }
    }

    /// Creates new event metadata with a trace ID for distributed request tracing.
    ///
    /// Use this constructor when you need to correlate this event with other
    /// events that are part of the same logical request or operation flow.
    ///
    /// # Arguments
    ///
    /// * `trace_id` - A unique identifier for the request chain (will be converted to String)
    ///
    /// # Example
    ///
    /// ```rust,ignore
    /// use bamboo_agent::agent::metrics::events::EventMeta;
    ///
    /// let meta = EventMeta::with_trace_id("req-12345");
    /// assert_eq!(meta.trace_id, Some("req-12345".to_string()));
    /// ```
    pub fn with_trace_id(trace_id: impl Into<String>) -> Self {
        Self {
            event_id: Uuid::new_v4().to_string(),
            occurred_at: Utc::now(),
            trace_id: Some(trace_id.into()),
        }
    }
}

impl Default for EventMeta {
    fn default() -> Self {
        Self::new()
    }
}

/// Unified metrics event enum encompassing all event categories.
///
/// This enum serves as the top-level container for all metrics events
/// in the system. It provides a single type that can be serialized,
/// deserialized, and processed uniformly.
///
/// # Variants
///
/// - `Chat`: Events from the internal agent chat loop
/// - `Forward`: Events from HTTP proxy operations to upstream APIs
/// - `System`: Operational events for system monitoring
///
/// # Serialization
///
/// All variants are JSON-serializable via serde, making them suitable
/// for storage, transmission over networks, and logging.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum MetricsEvent {
    Chat(ChatEvent),
    Forward(ForwardEvent),
    System(SystemEvent),
}

// ============================================================================
// Chat Events (Agent-internal usage)
// ============================================================================

/// Events emitted by the agent loop during chat sessions.
///
/// These events track the complete lifecycle of chat interactions within
/// the agent, from session initialization to completion, including all
/// intermediate operations like rounds and tool calls.
///
/// # Event Lifecycle
///
/// A typical chat session produces events in this order:
/// 1. `SessionStarted` - When a new conversation begins
/// 2. `RoundStarted` - When the agent begins processing a message
/// 3. `ToolCalled` (multiple) - For each tool invocation during the round
/// 4. `RoundCompleted` - When the agent finishes processing the message
/// 5. `MessageCountUpdated` - When the message count changes
/// 6. `SessionCompleted` - When the conversation ends
///
/// # Usage
///
/// These events are primarily used for:
/// - Performance monitoring (latency tracking)
/// - Resource usage analysis (token consumption)
/// - Behavior analytics (tool usage patterns)
/// - Error tracking and debugging
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ChatEvent {
    /// Emitted when a new chat session is initialized.
    ///
    /// This marks the beginning of a conversation between the user and agent.
    SessionStarted {
        /// Event metadata including unique ID and timestamp
        meta: EventMeta,
        /// Unique identifier for this chat session
        session_id: String,
        /// The AI model being used for this session (e.g., "gpt-4", "claude-3")
        model: String,
    },

    /// Emitted when a chat session completes or terminates.
    ///
    /// This marks the end of a conversation, whether successful or due to an error.
    SessionCompleted {
        /// Event metadata including unique ID and timestamp
        meta: EventMeta,
        /// Unique identifier for the chat session that completed
        session_id: String,
        /// Final status of the session (completed, failed, or cancelled)
        status: SessionStatus,
    },

    /// Emitted when the agent starts processing a new message round.
    ///
    /// A round represents a single request-response cycle within a session.
    /// Each user message typically triggers one round.
    RoundStarted {
        /// Event metadata including unique ID and timestamp
        meta: EventMeta,
        /// Unique identifier for this round
        round_id: String,
        /// Session this round belongs to
        session_id: String,
        /// The AI model being used for this round
        model: String,
    },

    /// Emitted when the agent completes processing a message round.
    ///
    /// Contains comprehensive metrics about the round including token usage,
    /// latency, and any errors that occurred.
    RoundCompleted {
        /// Event metadata including unique ID and timestamp
        meta: EventMeta,
        /// Unique identifier for the round that completed
        round_id: String,
        /// Session this round belongs to
        session_id: String,
        /// Final status of the round (success or failed)
        status: RoundStatus,
        /// Token consumption during this round
        usage: TokenUsage,
        /// Total time to process the round in milliseconds
        latency_ms: u64,
        /// Error message if the round failed, None on success
        error: Option<String>,
    },

    /// Emitted when the agent invokes a tool during a round.
    ///
    /// Tracks tool execution for understanding agent behavior and
    /// measuring tool performance.
    ToolCalled {
        /// Event metadata including unique ID and timestamp
        meta: EventMeta,
        /// Unique identifier for this tool invocation
        tool_call_id: String,
        /// Round this tool call belongs to
        round_id: String,
        /// Session this tool call belongs to
        session_id: String,
        /// Name of the tool being invoked (e.g., "read_file", "execute_command")
        tool_name: String,
        /// Time taken to execute the tool in milliseconds
        latency_ms: u64,
        /// Whether the tool execution succeeded
        success: bool,
    },

    /// Emitted when the message count for a session is updated.
    ///
    /// This tracks the total number of messages exchanged in the session,
    /// including both user and assistant messages.
    MessageCountUpdated {
        /// Event metadata including unique ID and timestamp
        meta: EventMeta,
        /// Session whose message count was updated
        session_id: String,
        /// New total message count for the session
        message_count: u32,
    },
}

// ============================================================================
// Forward Events (HTTP proxy)
// ============================================================================

/// Events emitted when forwarding requests to upstream APIs.
///
/// These events track HTTP proxy operations when the system forwards
/// requests to external API providers like OpenAI, Anthropic, or other
/// LLM services. They enable monitoring of API usage, costs, and performance.
///
/// # Event Lifecycle
///
/// A typical forward operation produces events in this order:
/// 1. `RequestStarted` - When a request is initiated to the upstream API
/// 2. `RequestCompleted` - When the response is received (or an error occurs)
///
/// # Use Cases
///
/// These events support:
/// - API cost tracking via token consumption
/// - Performance monitoring (latency, success rates)
/// - Error rate analysis per endpoint
/// - Load balancing and capacity planning
/// - Compliance and audit logging
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ForwardEvent {
    /// Emitted when a request is initiated to an upstream API.
    ///
    /// This marks the beginning of a forwarded HTTP request to an
    /// external service provider.
    RequestStarted {
        /// Event metadata including unique ID and timestamp
        meta: EventMeta,
        /// Unique identifier for this forwarded request
        request_id: String,
        /// Endpoint identifier (e.g., "openai.chat_completions" or "anthropic.messages")
        endpoint: String,
        /// The AI model being requested (e.g., "gpt-4", "claude-3-opus")
        model: String,
        /// Whether this is a streaming request (SSE) or regular request
        is_stream: bool,
    },

    /// Emitted when a forwarded request completes or fails.
    ///
    /// Contains comprehensive information about the request outcome,
    /// including HTTP status, token usage, latency, and any errors.
    RequestCompleted {
        /// Event metadata including unique ID and timestamp
        meta: EventMeta,
        /// Unique identifier for the request that completed
        request_id: String,
        /// HTTP status code returned by the upstream API
        status_code: u16,
        /// Status classification (success, error, or timeout)
        status: ForwardStatus,
        /// Token usage if available from the API response
        usage: Option<TokenUsage>,
        /// Total time for the request in milliseconds
        latency_ms: u64,
        /// Error message if the request failed, None on success
        error: Option<String>,
    },
}

// ============================================================================
// System Events
// ============================================================================

/// System-level events for operational metrics and monitoring.
///
/// These events track the health and operation of the metrics system itself,
/// providing visibility into system behavior, errors, and resource management.
/// They are primarily used for operational monitoring and debugging.
///
/// # Use Cases
///
/// - Monitoring system health and stability
/// - Tracking metrics collection reliability
/// - Identifying storage or processing issues
/// - Understanding worker lifecycle
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SystemEvent {
    /// Emitted when metrics events are dropped due to errors or resource constraints.
    ///
    /// This typically indicates system stress or configuration issues
    /// that should be investigated.
    ///
    /// # Fields
    ///
    /// - `count`: Number of events that were dropped
    /// - `reason`: Human-readable explanation of why events were dropped
    MetricsDropped {
        /// Number of events that were dropped
        count: u64,
        /// Explanation of why the events were dropped
        reason: String,
    },

    /// Emitted when a storage operation fails.
    ///
    /// Tracks errors in persisting metrics to the storage backend,
    /// helping identify database issues or data quality problems.
    ///
    /// # Fields
    ///
    /// - `error`: Description of the storage error
    /// - `event_type`: Type of event that failed to be stored
    StorageError {
        /// Description of the storage error that occurred
        error: String,
        /// Type of metrics event that failed to be stored
        event_type: String,
    },

    /// Emitted when a metrics processing worker starts.
    ///
    /// Indicates that a new worker thread/task has begun processing
    /// metrics events from the event queue.
    WorkerStarted,

    /// Emitted when a metrics processing worker stops.
    ///
    /// Indicates that a worker has shut down, either gracefully or
    /// due to an error. Used to track worker lifecycle and system capacity.
    WorkerStopped,
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_event_meta_new() {
        let meta = EventMeta::new();
        assert!(!meta.event_id.is_empty());
        assert!(meta.trace_id.is_none());
    }

    #[test]
    fn test_event_meta_with_trace_id() {
        let meta = EventMeta::with_trace_id("trace-123");
        assert!(!meta.event_id.is_empty());
        assert_eq!(meta.trace_id, Some("trace-123".to_string()));
    }

    #[test]
    fn test_chat_event_serialization() {
        let event = MetricsEvent::Chat(ChatEvent::SessionStarted {
            meta: EventMeta::new(),
            session_id: "session-123".to_string(),
            model: "gpt-4".to_string(),
        });

        let json = serde_json::to_string(&event).expect("serialize");
        let deserialized: MetricsEvent = serde_json::from_str(&json).expect("deserialize");

        match deserialized {
            MetricsEvent::Chat(ChatEvent::SessionStarted {
                session_id, model, ..
            }) => {
                assert_eq!(session_id, "session-123");
                assert_eq!(model, "gpt-4");
            }
            _ => panic!("Expected SessionStarted event"),
        }
    }

    #[test]
    fn test_forward_event_serialization() {
        let event = MetricsEvent::Forward(ForwardEvent::RequestStarted {
            meta: EventMeta::new(),
            request_id: "req-456".to_string(),
            endpoint: "openai.chat_completions".to_string(),
            model: "gpt-5-mini".to_string(),
            is_stream: true,
        });

        let json = serde_json::to_string(&event).expect("serialize");
        let deserialized: MetricsEvent = serde_json::from_str(&json).expect("deserialize");

        match deserialized {
            MetricsEvent::Forward(ForwardEvent::RequestStarted {
                request_id,
                endpoint,
                model,
                is_stream,
                ..
            }) => {
                assert_eq!(request_id, "req-456");
                assert_eq!(endpoint, "openai.chat_completions");
                assert_eq!(model, "gpt-5-mini");
                assert!(is_stream);
            }
            _ => panic!("Expected RequestStarted event"),
        }
    }
}