layer0/operator.rs
1//! The Operator protocol — what one operator does per cycle.
2
3use crate::{content::Content, duration::DurationMs, effect::Effect, error::OperatorError, id::*};
4use async_trait::async_trait;
5use rust_decimal::Decimal;
6use serde::{Deserialize, Serialize};
7
8/// What triggers an operator invocation. Informs context assembly — a scheduled trigger
9/// means you need to reconstruct everything from state, while a user
10/// message carries conversation context naturally.
11#[non_exhaustive]
12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
13#[serde(rename_all = "snake_case")]
14pub enum TriggerType {
15 /// Human sent a message.
16 User,
17 /// Another agent assigned a task.
18 Task,
19 /// Signal from another workflow/agent.
20 Signal,
21 /// Cron/schedule triggered.
22 Schedule,
23 /// System event (file change, webhook, etc.).
24 SystemEvent,
25 /// Future trigger types.
26 Custom(String),
27}
28
29/// Input to an operator. Everything the operator needs to execute.
30///
31/// Design decision: OperatorInput does NOT include conversation history
32/// or memory contents. The operator runtime reads those from a StateStore
33/// during context assembly. OperatorInput carries the *new* information
34/// that triggered this invocation — not the accumulated state.
35///
36/// This keeps the protocol boundary clean: the caller provides what's
37/// new, the operator runtime decides how to assemble context from what's
38/// new + what's stored.
39#[non_exhaustive]
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct OperatorInput {
42 /// The new message/task/signal that triggered this operator invocation.
43 pub message: Content,
44
45 /// What caused this operator invocation to start.
46 pub trigger: TriggerType,
47
48 /// Session for conversation continuity. If None, the operator is stateless.
49 /// The operator runtime uses this to read history from the StateStore.
50 pub session: Option<SessionId>,
51
52 /// Configuration for this specific operator execution.
53 /// None means "use the operator runtime's defaults."
54 pub config: Option<OperatorConfig>,
55
56 /// Opaque metadata that passes through the operator unchanged.
57 /// Useful for tracing (trace_id), routing (priority), or
58 /// domain-specific context that the protocol doesn't need
59 /// to understand.
60 #[serde(default)]
61 pub metadata: serde_json::Value,
62}
63
64/// Per-operator configuration overrides. Every field is optional —
65/// None means "use the implementation's default."
66#[non_exhaustive]
67#[derive(Debug, Clone, Default, Serialize, Deserialize)]
68pub struct OperatorConfig {
69 /// Maximum iterations of the inner ReAct loop.
70 pub max_turns: Option<u32>,
71
72 /// Maximum cost for this operator invocation in USD.
73 pub max_cost: Option<Decimal>,
74
75 /// Maximum wall-clock time for this operator invocation.
76 pub max_duration: Option<DurationMs>,
77
78 /// Model override (implementation-specific string).
79 pub model: Option<String>,
80
81 /// Operator restrictions for this operator invocation.
82 /// None = use defaults. Some(list) = only these operators.
83 #[serde(alias = "allowed_tools")]
84 pub allowed_operators: Option<Vec<String>>,
85
86 /// Additional system prompt content to prepend/append.
87 /// Does not replace the operator runtime's base identity —
88 /// it augments it. Use for per-task instructions.
89 pub system_addendum: Option<String>,
90}
91
92/// Why an operator invocation ended. The caller needs to know this to decide
93/// what happens next (retry? continue? escalate?).
94#[non_exhaustive]
95#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
96#[serde(rename_all = "snake_case")]
97pub enum ExitReason {
98 /// Model produced a final text response (natural completion).
99 Complete,
100 /// Hit the max_turns limit.
101 MaxTurns,
102 /// Hit the cost budget (`max_cost`) or the tool-call step limit (`max_tool_calls`).
103 /// Use `BudgetEvent` sink notifications to distinguish the two causes.
104 BudgetExhausted,
105 /// Circuit breaker tripped (consecutive failures).
106 CircuitBreaker,
107 /// Wall-clock timeout.
108 Timeout,
109 /// Interceptor/middleware halted execution.
110 InterceptorHalt {
111 /// The reason the interceptor halted execution.
112 reason: String,
113 },
114 /// Unrecoverable error during execution.
115 Error,
116 /// Provider safety system stopped generation (HTTP 200, content filtered).
117 ///
118 /// Semantically distinct from `Error` (not a transport or execution failure)
119 /// and `Complete` (model did not finish naturally). Arrives via
120 /// `StopReason::ContentFilter` in the provider response — the provider
121 /// acknowledged the request but refused to complete it. Not retriable
122 /// without modification to the context or request.
123 SafetyStop {
124 /// Human-readable reason string supplied by the provider or runtime.
125 reason: String,
126 },
127 /// One or more tool calls require human approval before execution.
128 /// The calling layer should inspect [`OperatorOutput::effects`] for
129 /// [`Effect::ToolApprovalRequired`] entries, obtain approval, then
130 /// either execute the tools and re-enter the loop, or inject a denial
131 /// message and re-enter.
132 AwaitingApproval,
133 /// Future exit reasons.
134 Custom(String),
135}
136
137/// Output from an operator. Contains the response, metadata about
138/// execution, and any side-effects the operator wants executed.
139#[non_exhaustive]
140#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct OperatorOutput {
142 /// The operator's response content.
143 pub message: Content,
144
145 /// Why the operator invocation ended.
146 pub exit_reason: ExitReason,
147
148 /// Execution metadata (cost, tokens, timing).
149 pub metadata: OperatorMetadata,
150
151 /// Side-effects the operator wants executed.
152 ///
153 /// CRITICAL DESIGN DECISION: The operator declares effects but does
154 /// not execute them. The calling layer (orchestrator, lifecycle
155 /// coordinator) decides when and how to execute them. This is
156 /// what makes the operator runtime independent of the layers around it.
157 ///
158 /// An operator running in-process has its effects executed immediately.
159 /// An operator running in a Temporal activity has its effects serialized
160 /// and executed by the workflow. Same operator code, different execution.
161 #[serde(default)]
162 pub effects: Vec<Effect>,
163}
164
165/// Execution metadata. Every field is concrete (not optional) because
166/// every operator produces this data. Implementations that can't track
167/// a field (e.g., cost for a local model) use zero/default.
168#[non_exhaustive]
169#[derive(Debug, Clone, Serialize, Deserialize)]
170pub struct OperatorMetadata {
171 /// Input tokens consumed.
172 pub tokens_in: u64,
173 /// Output tokens generated.
174 pub tokens_out: u64,
175 /// Cost in USD.
176 pub cost: Decimal,
177 /// Number of ReAct loop iterations used.
178 pub turns_used: u32,
179 /// Record of each sub-dispatch made.
180 #[serde(alias = "tools_called")]
181 pub sub_dispatches: Vec<SubDispatchRecord>,
182 /// Wall-clock duration of the operator invocation.
183 pub duration: DurationMs,
184}
185
186/// Record of a single sub-dispatch within an operator execution.
187#[non_exhaustive]
188#[derive(Debug, Clone, Serialize, Deserialize)]
189pub struct SubDispatchRecord {
190 /// Name of the operator (sub-dispatch) that was called.
191 pub name: String,
192 /// How long the sub-dispatch took.
193 pub duration: DurationMs,
194 /// Whether the call succeeded.
195 pub success: bool,
196}
197
198impl Default for OperatorMetadata {
199 fn default() -> Self {
200 Self {
201 tokens_in: 0,
202 tokens_out: 0,
203 cost: Decimal::ZERO,
204 turns_used: 0,
205 sub_dispatches: vec![],
206 duration: DurationMs::ZERO,
207 }
208 }
209}
210
211impl OperatorInput {
212 /// Create a new OperatorInput with required fields.
213 pub fn new(message: Content, trigger: TriggerType) -> Self {
214 Self {
215 message,
216 trigger,
217 session: None,
218 config: None,
219 metadata: serde_json::Value::Null,
220 }
221 }
222}
223
224impl OperatorOutput {
225 /// Create a new OperatorOutput with required fields.
226 pub fn new(message: Content, exit_reason: ExitReason) -> Self {
227 Self {
228 message,
229 exit_reason,
230 metadata: OperatorMetadata::default(),
231 effects: vec![],
232 }
233 }
234}
235
236impl SubDispatchRecord {
237 /// Create a new SubDispatchRecord.
238 pub fn new(name: impl Into<String>, duration: DurationMs, success: bool) -> Self {
239 Self {
240 name: name.into(),
241 duration,
242 success,
243 }
244 }
245}
246
247/// Metadata describing a tool/sub-operator's external interface.
248///
249/// Used by orchestrators, MCP servers, and any component that needs
250/// to advertise an operator's capabilities to external systems
251/// (including LLM tool-use schemas).
252///
253/// This is the bridge between the operator protocol and tool-use APIs:
254/// an operator that exposes itself as a "tool" attaches `ToolMetadata`
255/// so callers know how to invoke it.
256#[non_exhaustive]
257#[derive(Debug, Clone, Serialize, Deserialize)]
258pub struct ToolMetadata {
259 /// Human-readable name for the tool.
260 pub name: String,
261 /// Description of what the tool does (shown to LLMs in tool-use prompts).
262 pub description: String,
263 /// JSON Schema describing the expected input.
264 pub input_schema: serde_json::Value,
265 /// Whether this tool is safe to call concurrently with other tools.
266 /// Used by dispatch planners to decide parallel vs. sequential execution.
267 pub parallel_safe: bool,
268}
269
270impl ToolMetadata {
271 /// Create a new `ToolMetadata`.
272 pub fn new(
273 name: impl Into<String>,
274 description: impl Into<String>,
275 input_schema: serde_json::Value,
276 parallel_safe: bool,
277 ) -> Self {
278 Self {
279 name: name.into(),
280 description: description.into(),
281 input_schema,
282 parallel_safe,
283 }
284 }
285}
286
287// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
288// THE TRAIT
289// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
290
291/// Protocol ① — The Operator
292///
293/// What one operator does per cycle. Receives input, assembles context,
294/// reasons (model call), acts (tool execution), produces output.
295///
296/// The ReAct while-loop, the agentic loop, the augmented LLM —
297/// whatever you call it, this trait is its boundary.
298///
299/// Implementations:
300/// - skelegent's AgentLoop (full-featured operator with tools + context mgmt)
301/// - A raw API call wrapper (minimal, no tools)
302/// - A human-in-the-loop adapter (waits for human input)
303/// - A mock (for testing)
304///
305/// The trait is intentionally one method. The operator is atomic from the
306/// outside — you send input, you get output. Everything that happens
307/// inside (how many model calls, how many tool uses, what context
308/// strategy) is the implementation's concern.
309#[async_trait]
310pub trait Operator: Send + Sync {
311 /// Execute a single operator invocation.
312 ///
313 /// The operator runtime:
314 /// 1. Assembles context (identity + history + memory + tools)
315 /// 2. Runs the ReAct loop (reason → act → observe → repeat)
316 /// 3. Returns the output + effects
317 ///
318 /// The operator MAY read from a StateStore during context assembly.
319 /// The operator MUST NOT write to external state directly — it
320 /// declares writes as Effects in the output.
321 async fn execute(&self, input: OperatorInput) -> Result<OperatorOutput, OperatorError>;
322}
323
324#[cfg(test)]
325mod tests {
326 use super::*;
327
328 #[test]
329 fn tool_metadata_construction() {
330 let schema = serde_json::json!({
331 "type": "object",
332 "properties": {
333 "query": { "type": "string" }
334 },
335 "required": ["query"]
336 });
337 let meta = ToolMetadata::new("search", "Search the web", schema.clone(), true);
338 assert_eq!(meta.name, "search");
339 assert_eq!(meta.description, "Search the web");
340 assert_eq!(meta.input_schema, schema);
341 assert!(meta.parallel_safe);
342 }
343
344 #[test]
345 fn tool_metadata_serde_roundtrip() {
346 let meta = ToolMetadata::new(
347 "code_exec",
348 "Execute code in a sandbox",
349 serde_json::json!({"type": "object"}),
350 false,
351 );
352 let json = serde_json::to_string(&meta).expect("serialize");
353 let back: ToolMetadata = serde_json::from_str(&json).expect("deserialize");
354 assert_eq!(back.name, "code_exec");
355 assert_eq!(back.description, "Execute code in a sandbox");
356 assert!(!back.parallel_safe);
357 }
358}