1use crate::event::{Event, EventKind};
5use crate::ids::{RunId, SpanId};
6use async_trait::async_trait;
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11use std::time::Duration;
12
13#[allow(clippy::result_large_err)]
23pub trait EventSink: Send + Sync {
24 fn emit(&self, event: Event);
31
32 fn try_emit(&self, event: Event) -> Result<(), Event>;
35}
36
37#[derive(Debug, Default, Clone)]
40pub struct NullSink;
41
42impl EventSink for NullSink {
43 fn emit(&self, _event: Event) {}
44 fn try_emit(&self, _event: Event) -> Result<(), Event> {
45 Ok(())
46 }
47}
48
49#[async_trait]
51pub trait BudgetHandle: Send + Sync {
52 async fn check(&self, request: BudgetRequest) -> BudgetDecision;
53 async fn consume(&self, amount: BudgetAmount);
54 fn snapshot(&self) -> BudgetSnapshot;
55}
56
57#[derive(Debug, Clone, Default, Serialize, Deserialize)]
58pub struct BudgetAmount {
59 pub tokens_input: u64,
60 pub tokens_output: u64,
61 pub cost_usd: f64,
62 #[serde(with = "crate::context::duration_seconds")]
63 pub wall_clock: Duration,
64 pub steps: u32,
65}
66
67#[derive(Debug, Clone, Default, Serialize, Deserialize)]
70pub struct BudgetRequest {
71 pub estimated_input_tokens: Option<u64>,
72 pub estimated_output_tokens: Option<u64>,
73 pub estimated_cost_usd: Option<f64>,
74 #[serde(default, skip_serializing_if = "Option::is_none")]
75 pub label: Option<String>,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
79#[serde(tag = "decision", rename_all = "snake_case")]
80pub enum BudgetDecision {
81 Allow,
82 Deny { reason: String },
83}
84
85#[derive(Debug, Clone, Default, Serialize, Deserialize)]
86pub struct BudgetSnapshot {
87 pub consumed: BudgetAmount,
88 pub remaining: Option<BudgetAmount>,
89}
90
91#[derive(Debug, Default, Clone)]
93pub struct NullBudget;
94
95#[async_trait]
96impl BudgetHandle for NullBudget {
97 async fn check(&self, _request: BudgetRequest) -> BudgetDecision {
98 BudgetDecision::Allow
99 }
100 async fn consume(&self, _amount: BudgetAmount) {}
101 fn snapshot(&self) -> BudgetSnapshot {
102 BudgetSnapshot::default()
103 }
104}
105
106#[async_trait]
108pub trait ApprovalChannel: Send + Sync {
109 async fn request(&self, req: ApprovalRequest) -> ApprovalResponse;
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct ApprovalRequest {
114 pub tool_name: String,
115 pub input: Value,
116 pub reason: String,
117}
118
119#[derive(Debug, Clone, Serialize, Deserialize)]
120#[serde(tag = "kind", rename_all = "snake_case")]
121pub enum ApprovalResponse {
122 Allow,
123 Deny(String),
124}
125
126#[derive(Debug, Default, Clone)]
129pub struct NullApprovalChannel;
130
131#[async_trait]
132impl ApprovalChannel for NullApprovalChannel {
133 async fn request(&self, _req: ApprovalRequest) -> ApprovalResponse {
134 ApprovalResponse::Allow
135 }
136}
137
138pub type Cancellation = tokio_util::sync::CancellationToken;
141
142#[derive(Debug, thiserror::Error)]
145pub enum NamespaceError {
146 #[error("`user.log` namespace `{0}` collides with a built-in event-category prefix")]
147 BuiltinCollision(String),
148 #[error("`user.log` namespace must be non-empty")]
149 Empty,
150}
151
152pub type SharedSink = Arc<dyn EventSink>;
154
155#[derive(Clone)]
163pub struct ScopedEmitter {
164 sink: Arc<dyn EventSink>,
165 run_id: RunId,
166 seq: Arc<AtomicU64>,
167}
168
169impl ScopedEmitter {
170 pub fn new(sink: Arc<dyn EventSink>, run_id: RunId, seq: Arc<AtomicU64>) -> Self {
171 Self { sink, run_id, seq }
172 }
173
174 pub fn sink(&self) -> &Arc<dyn EventSink> {
175 &self.sink
176 }
177
178 pub fn run_id(&self) -> RunId {
179 self.run_id
180 }
181
182 pub fn next_seq(&self) -> u64 {
183 self.seq.fetch_add(1, Ordering::SeqCst)
184 }
185
186 pub fn emit(&self, span_id: impl Into<SpanId>, kind: EventKind, parent: Option<u64>) -> u64 {
189 let seq = self.next_seq();
190 let mut event = Event::new(seq, self.run_id, span_id, kind);
191 event.parent = parent;
192 self.sink.emit(event);
193 seq
194 }
195
196 pub fn try_emit(
199 &self,
200 span_id: impl Into<SpanId>,
201 kind: EventKind,
202 parent: Option<u64>,
203 ) -> Result<u64, u64> {
204 let seq = self.next_seq();
205 let mut event = Event::new(seq, self.run_id, span_id, kind);
206 event.parent = parent;
207 match self.sink.try_emit(event) {
208 Ok(()) => Ok(seq),
209 Err(_) => Err(seq),
210 }
211 }
212}
213
214mod duration_seconds {
215 use serde::{Deserialize, Deserializer, Serializer};
216 use std::time::Duration;
217
218 pub fn serialize<S: Serializer>(d: &Duration, s: S) -> Result<S::Ok, S::Error> {
219 s.serialize_f64(d.as_secs_f64())
220 }
221
222 pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Duration, D::Error> {
223 let secs = f64::deserialize(d)?;
224 Ok(Duration::from_secs_f64(secs))
225 }
226}