Skip to main content

layer0/
operator.rs

1//! The Operator protocol — what one operator does per cycle.
2
3use crate::{content::Content, duration::DurationMs, effect::Effect, error::OperatorError, id::*};
4use async_trait::async_trait;
5use rust_decimal::Decimal;
6use serde::{Deserialize, Serialize};
7
8/// What triggers an operator invocation. Informs context assembly — a scheduled trigger
9/// means you need to reconstruct everything from state, while a user
10/// message carries conversation context naturally.
11#[non_exhaustive]
12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
13#[serde(rename_all = "snake_case")]
14pub enum TriggerType {
15    /// Human sent a message.
16    User,
17    /// Another agent assigned a task.
18    Task,
19    /// Signal from another workflow/agent.
20    Signal,
21    /// Cron/schedule triggered.
22    Schedule,
23    /// System event (file change, webhook, etc.).
24    SystemEvent,
25    /// Future trigger types.
26    Custom(String),
27}
28
29/// Input to an operator. Everything the operator needs to execute.
30///
31/// Design decision: OperatorInput does NOT include conversation history
32/// or memory contents. The operator runtime reads those from a StateStore
33/// during context assembly. OperatorInput carries the *new* information
34/// that triggered this invocation — not the accumulated state.
35///
36/// This keeps the protocol boundary clean: the caller provides what's
37/// new, the operator runtime decides how to assemble context from what's
38/// new + what's stored.
39#[non_exhaustive]
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct OperatorInput {
42    /// The new message/task/signal that triggered this operator invocation.
43    pub message: Content,
44
45    /// What caused this operator invocation to start.
46    pub trigger: TriggerType,
47
48    /// Session for conversation continuity. If None, the operator is stateless.
49    /// The operator runtime uses this to read history from the StateStore.
50    pub session: Option<SessionId>,
51
52    /// Configuration for this specific operator execution.
53    /// None means "use the operator runtime's defaults."
54    pub config: Option<OperatorConfig>,
55
56    /// Opaque metadata that passes through the operator unchanged.
57    /// Useful for tracing (trace_id), routing (priority), or
58    /// domain-specific context that the protocol doesn't need
59    /// to understand.
60    #[serde(default)]
61    pub metadata: serde_json::Value,
62}
63
64/// Per-operator configuration overrides. Every field is optional —
65/// None means "use the implementation's default."
66#[non_exhaustive]
67#[derive(Debug, Clone, Default, Serialize, Deserialize)]
68pub struct OperatorConfig {
69    /// Maximum iterations of the inner ReAct loop.
70    pub max_turns: Option<u32>,
71
72    /// Maximum cost for this operator invocation in USD.
73    pub max_cost: Option<Decimal>,
74
75    /// Maximum wall-clock time for this operator invocation.
76    pub max_duration: Option<DurationMs>,
77
78    /// Model override (implementation-specific string).
79    pub model: Option<String>,
80
81    /// Operator restrictions for this operator invocation.
82    /// None = use defaults. Some(list) = only these operators.
83    #[serde(alias = "allowed_tools")]
84    pub allowed_operators: Option<Vec<String>>,
85
86    /// Additional system prompt content to prepend/append.
87    /// Does not replace the operator runtime's base identity —
88    /// it augments it. Use for per-task instructions.
89    pub system_addendum: Option<String>,
90}
91
92/// Why an operator invocation ended. The caller needs to know this to decide
93/// what happens next (retry? continue? escalate?).
94#[non_exhaustive]
95#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
96#[serde(rename_all = "snake_case")]
97pub enum ExitReason {
98    /// Model produced a final text response (natural completion).
99    Complete,
100    /// Hit the max_turns limit.
101    MaxTurns,
102    /// Hit the cost budget (`max_cost`) or the tool-call step limit (`max_tool_calls`).
103    /// Use `BudgetEvent` sink notifications to distinguish the two causes.
104    BudgetExhausted,
105    /// Circuit breaker tripped (consecutive failures).
106    CircuitBreaker,
107    /// Wall-clock timeout.
108    Timeout,
109    /// Interceptor/middleware halted execution.
110    InterceptorHalt {
111        /// The reason the interceptor halted execution.
112        reason: String,
113    },
114    /// Unrecoverable error during execution.
115    Error,
116    /// Provider safety system stopped generation (HTTP 200, content filtered).
117    ///
118    /// Semantically distinct from `Error` (not a transport or execution failure)
119    /// and `Complete` (model did not finish naturally). Arrives via
120    /// `StopReason::ContentFilter` in the provider response — the provider
121    /// acknowledged the request but refused to complete it. Not retriable
122    /// without modification to the context or request.
123    SafetyStop {
124        /// Human-readable reason string supplied by the provider or runtime.
125        reason: String,
126    },
127    /// One or more tool calls require human approval before execution.
128    /// The calling layer should inspect [`OperatorOutput::effects`] for
129    /// [`Effect::ToolApprovalRequired`] entries, obtain approval, then
130    /// either execute the tools and re-enter the loop, or inject a denial
131    /// message and re-enter.
132    AwaitingApproval,
133    /// Future exit reasons.
134    Custom(String),
135}
136
137/// Output from an operator. Contains the response, metadata about
138/// execution, and any side-effects the operator wants executed.
139#[non_exhaustive]
140#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct OperatorOutput {
142    /// The operator's response content.
143    pub message: Content,
144
145    /// Why the operator invocation ended.
146    pub exit_reason: ExitReason,
147
148    /// Execution metadata (cost, tokens, timing).
149    pub metadata: OperatorMetadata,
150
151    /// Side-effects the operator wants executed.
152    ///
153    /// CRITICAL DESIGN DECISION: The operator declares effects but does
154    /// not execute them. The calling layer (orchestrator, lifecycle
155    /// coordinator) decides when and how to execute them. This is
156    /// what makes the operator runtime independent of the layers around it.
157    ///
158    /// An operator running in-process has its effects executed immediately.
159    /// An operator running in a Temporal activity has its effects serialized
160    /// and executed by the workflow. Same operator code, different execution.
161    #[serde(default)]
162    pub effects: Vec<Effect>,
163}
164
165/// Execution metadata. Every field is concrete (not optional) because
166/// every operator produces this data. Implementations that can't track
167/// a field (e.g., cost for a local model) use zero/default.
168#[non_exhaustive]
169#[derive(Debug, Clone, Serialize, Deserialize)]
170pub struct OperatorMetadata {
171    /// Input tokens consumed.
172    pub tokens_in: u64,
173    /// Output tokens generated.
174    pub tokens_out: u64,
175    /// Cost in USD.
176    pub cost: Decimal,
177    /// Number of ReAct loop iterations used.
178    pub turns_used: u32,
179    /// Record of each sub-dispatch made.
180    #[serde(alias = "tools_called")]
181    pub sub_dispatches: Vec<SubDispatchRecord>,
182    /// Wall-clock duration of the operator invocation.
183    pub duration: DurationMs,
184}
185
186/// Record of a single sub-dispatch within an operator execution.
187#[non_exhaustive]
188#[derive(Debug, Clone, Serialize, Deserialize)]
189pub struct SubDispatchRecord {
190    /// Name of the operator (sub-dispatch) that was called.
191    pub name: String,
192    /// How long the sub-dispatch took.
193    pub duration: DurationMs,
194    /// Whether the call succeeded.
195    pub success: bool,
196}
197
198impl Default for OperatorMetadata {
199    fn default() -> Self {
200        Self {
201            tokens_in: 0,
202            tokens_out: 0,
203            cost: Decimal::ZERO,
204            turns_used: 0,
205            sub_dispatches: vec![],
206            duration: DurationMs::ZERO,
207        }
208    }
209}
210
211impl OperatorInput {
212    /// Create a new OperatorInput with required fields.
213    pub fn new(message: Content, trigger: TriggerType) -> Self {
214        Self {
215            message,
216            trigger,
217            session: None,
218            config: None,
219            metadata: serde_json::Value::Null,
220        }
221    }
222}
223
224impl OperatorOutput {
225    /// Create a new OperatorOutput with required fields.
226    pub fn new(message: Content, exit_reason: ExitReason) -> Self {
227        Self {
228            message,
229            exit_reason,
230            metadata: OperatorMetadata::default(),
231            effects: vec![],
232        }
233    }
234}
235
236impl SubDispatchRecord {
237    /// Create a new SubDispatchRecord.
238    pub fn new(name: impl Into<String>, duration: DurationMs, success: bool) -> Self {
239        Self {
240            name: name.into(),
241            duration,
242            success,
243        }
244    }
245}
246
247/// Metadata describing a tool/sub-operator's external interface.
248///
249/// Used by orchestrators, MCP servers, and any component that needs
250/// to advertise an operator's capabilities to external systems
251/// (including LLM tool-use schemas).
252///
253/// This is the bridge between the operator protocol and tool-use APIs:
254/// an operator that exposes itself as a "tool" attaches `ToolMetadata`
255/// so callers know how to invoke it.
256#[non_exhaustive]
257#[derive(Debug, Clone, Serialize, Deserialize)]
258pub struct ToolMetadata {
259    /// Human-readable name for the tool.
260    pub name: String,
261    /// Description of what the tool does (shown to LLMs in tool-use prompts).
262    pub description: String,
263    /// JSON Schema describing the expected input.
264    pub input_schema: serde_json::Value,
265    /// Whether this tool is safe to call concurrently with other tools.
266    /// Used by dispatch planners to decide parallel vs. sequential execution.
267    pub parallel_safe: bool,
268}
269
270impl ToolMetadata {
271    /// Create a new `ToolMetadata`.
272    pub fn new(
273        name: impl Into<String>,
274        description: impl Into<String>,
275        input_schema: serde_json::Value,
276        parallel_safe: bool,
277    ) -> Self {
278        Self {
279            name: name.into(),
280            description: description.into(),
281            input_schema,
282            parallel_safe,
283        }
284    }
285}
286
287// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
288// THE TRAIT
289// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
290
291/// Protocol ① — The Operator
292///
293/// What one operator does per cycle. Receives input, assembles context,
294/// reasons (model call), acts (tool execution), produces output.
295///
296/// The ReAct while-loop, the agentic loop, the augmented LLM —
297/// whatever you call it, this trait is its boundary.
298///
299/// Implementations:
300/// - skelegent's AgentLoop (full-featured operator with tools + context mgmt)
301/// - A raw API call wrapper (minimal, no tools)
302/// - A human-in-the-loop adapter (waits for human input)
303/// - A mock (for testing)
304///
305/// The trait is intentionally one method. The operator is atomic from the
306/// outside — you send input, you get output. Everything that happens
307/// inside (how many model calls, how many tool uses, what context
308/// strategy) is the implementation's concern.
309#[async_trait]
310pub trait Operator: Send + Sync {
311    /// Execute a single operator invocation.
312    ///
313    /// The operator runtime:
314    /// 1. Assembles context (identity + history + memory + tools)
315    /// 2. Runs the ReAct loop (reason → act → observe → repeat)
316    /// 3. Returns the output + effects
317    ///
318    /// The operator MAY read from a StateStore during context assembly.
319    /// The operator MUST NOT write to external state directly — it
320    /// declares writes as Effects in the output.
321    async fn execute(&self, input: OperatorInput) -> Result<OperatorOutput, OperatorError>;
322}
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327
328    #[test]
329    fn tool_metadata_construction() {
330        let schema = serde_json::json!({
331            "type": "object",
332            "properties": {
333                "query": { "type": "string" }
334            },
335            "required": ["query"]
336        });
337        let meta = ToolMetadata::new("search", "Search the web", schema.clone(), true);
338        assert_eq!(meta.name, "search");
339        assert_eq!(meta.description, "Search the web");
340        assert_eq!(meta.input_schema, schema);
341        assert!(meta.parallel_safe);
342    }
343
344    #[test]
345    fn tool_metadata_serde_roundtrip() {
346        let meta = ToolMetadata::new(
347            "code_exec",
348            "Execute code in a sandbox",
349            serde_json::json!({"type": "object"}),
350            false,
351        );
352        let json = serde_json::to_string(&meta).expect("serialize");
353        let back: ToolMetadata = serde_json::from_str(&json).expect("deserialize");
354        assert_eq!(back.name, "code_exec");
355        assert_eq!(back.description, "Execute code in a sandbox");
356        assert!(!back.parallel_safe);
357    }
358}