Skip to main content

oharness_core/
context.rs

1//! Context-plumbing traits (§4.6). Defined here so downstream crates can depend only
2//! on `oharness-core` when threading observability / budget / approval / cancellation.
3
4use crate::event::{Event, EventKind};
5use crate::ids::{RunId, SpanId};
6use async_trait::async_trait;
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11use std::time::Duration;
12
13/// Emit events to wherever the harness routes them.
14///
15/// Implementations provide BOTH methods explicitly — no defaults — because their
16/// semantics differ fundamentally. `emit` blocks on backpressure; `try_emit` never
17/// blocks. The split exists so callers can pick the right one at the call site.
18///
19/// NOTE: `try_emit` returns the event on failure so callers can retry or log. The
20/// `Err` variant is larger than clippy likes; we suppress the lint because the
21/// ownership hand-back is load-bearing to the contract.
22#[allow(clippy::result_large_err)]
23pub trait EventSink: Send + Sync {
24    /// Blocking-as-specified emit.
25    ///
26    /// - Default shipped sinks: `try_send` first; on `Full`, block via
27    ///   `tokio::task::spawn_blocking` so a tokio worker thread is not stalled.
28    /// - `NullSink`: discards unconditionally.
29    /// - Call from `Drop`: callers SHOULD use `try_emit` instead.
30    fn emit(&self, event: Event);
31
32    /// Non-blocking variant. Returns `Err(event)` immediately on a full channel so the
33    /// caller can decide policy (drop, buffer locally, log). Never blocks, never spawns.
34    fn try_emit(&self, event: Event) -> Result<(), Event>;
35}
36
37/// A sink that silently discards events. Default for `AgentBuilder` — a library call
38/// must not write to `cwd` implicitly.
39#[derive(Debug, Default, Clone)]
40pub struct NullSink;
41
42impl EventSink for NullSink {
43    fn emit(&self, _event: Event) {}
44    fn try_emit(&self, _event: Event) -> Result<(), Event> {
45        Ok(())
46    }
47}
48
49/// Budget state accessible from tools and middleware.
50#[async_trait]
51pub trait BudgetHandle: Send + Sync {
52    async fn check(&self, request: BudgetRequest) -> BudgetDecision;
53    async fn consume(&self, amount: BudgetAmount);
54    fn snapshot(&self) -> BudgetSnapshot;
55}
56
57#[derive(Debug, Clone, Default, Serialize, Deserialize)]
58pub struct BudgetAmount {
59    pub tokens_input: u64,
60    pub tokens_output: u64,
61    pub cost_usd: f64,
62    #[serde(with = "crate::context::duration_seconds")]
63    pub wall_clock: Duration,
64    pub steps: u32,
65}
66
67/// A pre-call estimate supplied to `BudgetHandle::check`. Fields are all optional —
68/// callers populate what they can estimate.
69#[derive(Debug, Clone, Default, Serialize, Deserialize)]
70pub struct BudgetRequest {
71    pub estimated_input_tokens: Option<u64>,
72    pub estimated_output_tokens: Option<u64>,
73    pub estimated_cost_usd: Option<f64>,
74    #[serde(default, skip_serializing_if = "Option::is_none")]
75    pub label: Option<String>,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
79#[serde(tag = "decision", rename_all = "snake_case")]
80pub enum BudgetDecision {
81    Allow,
82    Deny { reason: String },
83}
84
85#[derive(Debug, Clone, Default, Serialize, Deserialize)]
86pub struct BudgetSnapshot {
87    pub consumed: BudgetAmount,
88    pub remaining: Option<BudgetAmount>,
89}
90
91/// A no-op budget that always allows. Default in `AgentBuilder`.
92#[derive(Debug, Default, Clone)]
93pub struct NullBudget;
94
95#[async_trait]
96impl BudgetHandle for NullBudget {
97    async fn check(&self, _request: BudgetRequest) -> BudgetDecision {
98        BudgetDecision::Allow
99    }
100    async fn consume(&self, _amount: BudgetAmount) {}
101    fn snapshot(&self) -> BudgetSnapshot {
102        BudgetSnapshot::default()
103    }
104}
105
106/// Approval channel for tool calls needing human/external OK.
107#[async_trait]
108pub trait ApprovalChannel: Send + Sync {
109    async fn request(&self, req: ApprovalRequest) -> ApprovalResponse;
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct ApprovalRequest {
114    pub tool_name: String,
115    pub input: Value,
116    pub reason: String,
117}
118
119#[derive(Debug, Clone, Serialize, Deserialize)]
120#[serde(tag = "kind", rename_all = "snake_case")]
121pub enum ApprovalResponse {
122    Allow,
123    Deny(String),
124}
125
126/// A default approval channel that allows everything. Suitable for non-interactive
127/// contexts; CLI/UI surfaces provide interactive variants.
128#[derive(Debug, Default, Clone)]
129pub struct NullApprovalChannel;
130
131#[async_trait]
132impl ApprovalChannel for NullApprovalChannel {
133    async fn request(&self, _req: ApprovalRequest) -> ApprovalResponse {
134        ApprovalResponse::Allow
135    }
136}
137
138/// Stable alias for the cancellation primitive. Lets us swap the underlying type if
139/// `tokio-util`'s token is ever replaced without churning every `use` site.
140pub type Cancellation = tokio_util::sync::CancellationToken;
141
142/// Error returned when event-payload construction rejects an invalid namespace for
143/// `user.log` events (§4.7).
144#[derive(Debug, thiserror::Error)]
145pub enum NamespaceError {
146    #[error("`user.log` namespace `{0}` collides with a built-in event-category prefix")]
147    BuiltinCollision(String),
148    #[error("`user.log` namespace must be non-empty")]
149    Empty,
150}
151
152/// Convenient alias: `Arc`-wrapped sink, the type every subsystem receives.
153pub type SharedSink = Arc<dyn EventSink>;
154
155/// Run-scoped event emitter. Subsystems (tools, memory, middleware) receive one of
156/// these so they don't need to know about run_id or the monotonic seq counter — they
157/// just call `.emit(span_id, kind)`.
158///
159/// The plan's context types show `events: Arc<dyn EventSink>` alone; in practice the
160/// envelope needs a `seq` and `run_id` that the loop owns. This wrapper closes that
161/// gap without changing the `EventSink` trait.
162#[derive(Clone)]
163pub struct ScopedEmitter {
164    sink: Arc<dyn EventSink>,
165    run_id: RunId,
166    seq: Arc<AtomicU64>,
167}
168
169impl ScopedEmitter {
170    pub fn new(sink: Arc<dyn EventSink>, run_id: RunId, seq: Arc<AtomicU64>) -> Self {
171        Self { sink, run_id, seq }
172    }
173
174    pub fn sink(&self) -> &Arc<dyn EventSink> {
175        &self.sink
176    }
177
178    pub fn run_id(&self) -> RunId {
179        self.run_id
180    }
181
182    pub fn next_seq(&self) -> u64 {
183        self.seq.fetch_add(1, Ordering::SeqCst)
184    }
185
186    /// Build and emit a stamped event (blocks on backpressure per `EventSink::emit`).
187    /// Returns the event's `seq` so spans can be linked.
188    pub fn emit(&self, span_id: impl Into<SpanId>, kind: EventKind, parent: Option<u64>) -> u64 {
189        let seq = self.next_seq();
190        let mut event = Event::new(seq, self.run_id, span_id, kind);
191        event.parent = parent;
192        self.sink.emit(event);
193        seq
194    }
195
196    /// Non-blocking emit. On backpressure, the event is dropped and `Err(seq)` is
197    /// returned so the caller can log or retry.
198    pub fn try_emit(
199        &self,
200        span_id: impl Into<SpanId>,
201        kind: EventKind,
202        parent: Option<u64>,
203    ) -> Result<u64, u64> {
204        let seq = self.next_seq();
205        let mut event = Event::new(seq, self.run_id, span_id, kind);
206        event.parent = parent;
207        match self.sink.try_emit(event) {
208            Ok(()) => Ok(seq),
209            Err(_) => Err(seq),
210        }
211    }
212}
213
214mod duration_seconds {
215    use serde::{Deserialize, Deserializer, Serializer};
216    use std::time::Duration;
217
218    pub fn serialize<S: Serializer>(d: &Duration, s: S) -> Result<S::Ok, S::Error> {
219        s.serialize_f64(d.as_secs_f64())
220    }
221
222    pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Duration, D::Error> {
223        let secs = f64::deserialize(d)?;
224        Ok(Duration::from_secs_f64(secs))
225    }
226}