Skip to main content

meerkat_core/
ops.rs

1//! Async operation types for Meerkat
2//!
3//! Unified abstraction for tool calls, shell commands, and delegated branches.
4
5use crate::budget::BudgetLimits;
6use crate::session::ToolVisibilityWitness;
7use crate::types::Message;
8use serde::{Deserialize, Serialize};
9use std::collections::{BTreeMap, BTreeSet};
10use uuid::Uuid;
11
12/// Unique identifier for an operation
13#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
14pub struct OperationId(pub Uuid);
15
16/// Wait policy for async operations.
17///
18/// Determines whether an operation blocks the turn boundary (`Barrier`) or runs
19/// independently (`Detached`).
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
21#[serde(rename_all = "snake_case")]
22pub enum WaitPolicy {
23    /// Operation must complete before `ToolCallsResolved` can fire.
24    Barrier,
25    /// Operation runs independently and does not block the turn.
26    Detached,
27}
28
29/// Typed async operation reference carrying an operation ID and its wait policy.
30///
31/// Replaces raw `OperationId` sequences in the TurnExecution machine state to
32/// enable barrier-aware scheduling. Only `Barrier` ops block the turn boundary;
33/// `Detached` ops are recorded but do not gate `ToolCallsResolved`.
34#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
35pub struct AsyncOpRef {
36    pub operation_id: OperationId,
37    pub wait_policy: WaitPolicy,
38}
39
40impl WaitPolicy {
41    /// Normal tool-call operations that must complete before the turn boundary.
42    pub fn barrier() -> Self {
43        Self::Barrier
44    }
45
46    /// Background or mob-child operations that run independently of the turn.
47    pub fn detached() -> Self {
48        Self::Detached
49    }
50}
51
52impl AsyncOpRef {
53    /// Create a barrier op ref — blocks the turn boundary until resolved.
54    pub fn barrier(operation_id: OperationId) -> Self {
55        Self {
56            operation_id,
57            wait_policy: WaitPolicy::barrier(),
58        }
59    }
60
61    /// Create a detached op ref — runs independently, does not block the turn.
62    pub fn detached(operation_id: OperationId) -> Self {
63        Self {
64            operation_id,
65            wait_policy: WaitPolicy::detached(),
66        }
67    }
68}
69
70/// Outcome of a tool dispatch, separating transcript data from execution metadata.
71///
72/// `result` is what the model sees (conversation/transcript). `async_ops` is
73/// what the runtime scheduler sees (barrier/detached classification). This
74/// prevents hooks, persistence, and message serialization from accidentally
75/// owning barrier semantics.
76/// Typed session-level effect produced by tool dispatch.
77///
78/// Tools that need to mutate session-owned durable state (e.g., mob authority)
79/// must NOT call `SessionService` methods from inside dispatch. Instead they
80/// return typed effects here, and the turn owner (agent loop) merges and commits
81/// them after the parallel tool batch completes.
82#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
83#[serde(tag = "effect_type", rename_all = "snake_case")]
84pub enum SessionEffect {
85    /// Grant management scope for a specific mob.
86    GrantManageMob { mob_id: String },
87    /// Record durable deferred-tool requests for subsequent boundaries.
88    RequestDeferredTools {
89        names: BTreeSet<String>,
90        witnesses: BTreeMap<String, ToolVisibilityWitness>,
91    },
92}
93
94#[derive(Debug, Clone)]
95pub struct ToolDispatchOutcome {
96    /// The tool result for the conversation transcript.
97    pub result: crate::types::ToolResult,
98    /// Async operations started by this dispatch, with typed wait policies.
99    ///
100    /// Empty for synchronous tools. Barrier ops block the turn boundary;
101    /// detached ops run independently.
102    pub async_ops: Vec<AsyncOpRef>,
103    /// Session-level effects to be merged by the turn owner after the batch.
104    ///
105    /// Most tools return an empty vec. Tools that need durable session state
106    /// changes (e.g., mob authority grants) emit typed effects here instead
107    /// of calling `SessionService` from inside dispatch.
108    pub session_effects: Vec<SessionEffect>,
109}
110
111impl ToolDispatchOutcome {
112    /// Create an outcome with no async operations or session effects (synchronous tool).
113    pub fn sync_result(result: crate::types::ToolResult) -> Self {
114        Self {
115            result,
116            async_ops: Vec::new(),
117            session_effects: Vec::new(),
118        }
119    }
120}
121
122impl From<crate::types::ToolResult> for ToolDispatchOutcome {
123    fn from(result: crate::types::ToolResult) -> Self {
124        Self::sync_result(result)
125    }
126}
127
128impl OperationId {
129    /// Create a new operation ID
130    pub fn new() -> Self {
131        Self(Uuid::now_v7())
132    }
133}
134
135impl Default for OperationId {
136    fn default() -> Self {
137        Self::new()
138    }
139}
140
141impl std::fmt::Display for OperationId {
142    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143        write!(f, "{}", self.0)
144    }
145}
146
147/// What kind of work the operation performs
148#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
149#[serde(rename_all = "snake_case")]
150pub enum WorkKind {
151    /// MCP or internal tool call
152    ToolCall,
153    /// Shell command execution
154    ShellCommand,
155}
156
157/// Shape of the operation's result
158#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
159#[serde(rename_all = "snake_case")]
160pub enum ResultShape {
161    /// Single result value
162    Single,
163    /// Streaming output (progress events)
164    Stream,
165    /// Multiple results (e.g., fork branches)
166    Batch,
167}
168
169/// How much context a delegated branch receives
170#[derive(Debug, Clone, Default, Serialize, Deserialize)]
171#[serde(tag = "type", content = "value", rename_all = "snake_case")]
172pub enum ContextStrategy {
173    /// Complete conversation history (Fork default)
174    #[default]
175    FullHistory,
176    /// Last N turns from parent
177    LastTurns(u32),
178    /// Compressed summary of conversation
179    Summary { max_tokens: u32 },
180    /// Explicit message list
181    Custom { messages: Vec<Message> },
182}
183
184/// How to allocate budget when forking
185#[derive(Debug, Clone, Default, Serialize, Deserialize)]
186#[serde(tag = "type", content = "value", rename_all = "snake_case")]
187pub enum ForkBudgetPolicy {
188    /// Split remaining budget equally among branches
189    #[default]
190    Equal,
191    /// Split proportionally based on weights
192    Proportional,
193    /// Fixed budget per branch
194    Fixed(u64),
195    /// Give all remaining budget to each branch
196    Remaining,
197}
198
199/// Tool access control for delegated branches
200#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
201#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
202#[serde(tag = "type", content = "value", rename_all = "snake_case")]
203pub enum ToolAccessPolicy {
204    /// Inherit all tools from parent
205    #[default]
206    Inherit,
207    /// Only allow specific tools
208    AllowList(Vec<String>),
209    /// Block specific tools
210    DenyList(Vec<String>),
211}
212
213/// Policy for operation execution
214#[derive(Debug, Clone, Serialize, Deserialize, Default)]
215pub struct OperationPolicy {
216    /// Timeout for this operation
217    pub timeout_ms: Option<u64>,
218    /// Whether to cancel on parent cancellation
219    pub cancel_on_parent_cancel: bool,
220    /// Whether to include in checkpoints
221    pub checkpoint_results: bool,
222}
223
224/// Complete operation specification
225#[derive(Debug, Clone, Serialize, Deserialize)]
226pub struct OperationSpec {
227    pub id: OperationId,
228    pub kind: WorkKind,
229    pub result_shape: ResultShape,
230    pub policy: OperationPolicy,
231    pub budget_reservation: BudgetLimits,
232    pub depth: u32,
233    pub depends_on: Vec<OperationId>,
234    pub context: Option<ContextStrategy>,
235    pub tool_access: Option<ToolAccessPolicy>,
236}
237
238/// Result of a completed operation
239#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
240pub struct OperationResult {
241    pub id: OperationId,
242    pub content: String,
243    pub is_error: bool,
244    pub duration_ms: u64,
245    pub tokens_used: u64,
246}
247
248/// Events from operations
249#[derive(Debug, Clone, Serialize, Deserialize)]
250#[serde(tag = "type", rename_all = "snake_case")]
251pub enum OpEvent {
252    /// Operation started executing
253    Started { id: OperationId, kind: WorkKind },
254
255    /// Progress update (for streaming operations)
256    Progress {
257        id: OperationId,
258        message: String,
259        percent: Option<f32>,
260    },
261
262    /// Operation completed successfully
263    Completed {
264        id: OperationId,
265        result: OperationResult,
266    },
267
268    /// Operation failed
269    Failed { id: OperationId, error: String },
270
271    /// Operation was cancelled
272    Cancelled { id: OperationId },
273}
274
275/// Concurrency limits for operations
276#[derive(Debug, Clone, Serialize, Deserialize)]
277pub struct ConcurrencyLimits {
278    /// Maximum delegated-branch nesting depth
279    pub max_depth: u32,
280    /// Maximum concurrent operations (all types)
281    pub max_concurrent_ops: usize,
282    /// Maximum concurrent delegated branches specifically
283    pub max_concurrent_agents: usize,
284    /// Maximum children per parent agent
285    pub max_children_per_agent: usize,
286}
287
288impl Default for ConcurrencyLimits {
289    fn default() -> Self {
290        Self {
291            max_depth: 3,
292            max_concurrent_ops: 32,
293            max_concurrent_agents: 8,
294            max_children_per_agent: 5,
295        }
296    }
297}
298
299/// Specification for spawning a new delegated branch
300#[derive(Debug, Clone, Serialize, Deserialize, Default)]
301pub struct SpawnSpec {
302    /// The prompt/task for the delegated branch
303    pub prompt: String,
304    /// How much context the delegated branch receives
305    pub context: ContextStrategy,
306    /// Which tools the delegated branch can access
307    pub tool_access: ToolAccessPolicy,
308    /// Budget allocation for the delegated branch
309    pub budget: BudgetLimits,
310    /// If false, the delegated branch cannot spawn/fork further
311    pub allow_spawn: bool,
312    /// System prompt override (None = inherit from parent)
313    pub system_prompt: Option<String>,
314}
315
316/// A branch in a fork operation
317#[derive(Debug, Clone, Serialize, Deserialize)]
318pub struct ForkBranch {
319    /// Identifier for this branch
320    pub name: String,
321    /// The prompt/task for this branch
322    pub prompt: String,
323    /// Tool access override (None = inherit)
324    pub tool_access: Option<ToolAccessPolicy>,
325}
326
327#[cfg(test)]
328#[allow(clippy::unwrap_used, clippy::expect_used)]
329mod tests {
330    use super::*;
331
332    #[test]
333    fn barrier_constructor_produces_barrier_policy() {
334        assert_eq!(WaitPolicy::barrier(), WaitPolicy::Barrier);
335        let op_ref = AsyncOpRef::barrier(OperationId::new());
336        assert_eq!(op_ref.wait_policy, WaitPolicy::Barrier);
337    }
338
339    #[test]
340    fn detached_constructor_produces_detached_policy() {
341        assert_eq!(WaitPolicy::detached(), WaitPolicy::Detached);
342        let op_ref = AsyncOpRef::detached(OperationId::new());
343        assert_eq!(op_ref.wait_policy, WaitPolicy::Detached);
344    }
345
346    #[test]
347    fn test_operation_id_encoding() {
348        let id = OperationId::new();
349        let json = serde_json::to_string(&id).unwrap();
350
351        let parsed: OperationId = serde_json::from_str(&json).unwrap();
352        assert_eq!(id, parsed);
353    }
354
355    #[test]
356    fn test_work_kind_serialization() {
357        assert_eq!(
358            serde_json::to_value(WorkKind::ToolCall).unwrap(),
359            "tool_call"
360        );
361        assert_eq!(
362            serde_json::to_value(WorkKind::ShellCommand).unwrap(),
363            "shell_command"
364        );
365    }
366
367    #[test]
368    fn test_context_strategy_serialization() {
369        let full = ContextStrategy::FullHistory;
370        let json = serde_json::to_value(&full).unwrap();
371        assert_eq!(json["type"], "full_history");
372
373        let last = ContextStrategy::LastTurns(5);
374        let json = serde_json::to_value(&last).unwrap();
375        assert_eq!(json["type"], "last_turns");
376        // Adjacently-tagged: {"type": "last_turns", "value": 5}
377        assert_eq!(json["value"], 5);
378
379        let summary = ContextStrategy::Summary { max_tokens: 1000 };
380        let json = serde_json::to_value(&summary).unwrap();
381        assert_eq!(json["type"], "summary");
382        // Adjacently-tagged struct variant: {"type": "summary", "value": {"max_tokens": 1000}}
383        assert_eq!(json["value"]["max_tokens"], 1000);
384
385        // Roundtrip
386        let parsed: ContextStrategy = serde_json::from_value(json).unwrap();
387        match parsed {
388            ContextStrategy::Summary { max_tokens } => assert_eq!(max_tokens, 1000),
389            _ => unreachable!("Wrong variant"),
390        }
391    }
392
393    #[test]
394    fn test_fork_budget_policy_serialization() {
395        let policies = vec![
396            (ForkBudgetPolicy::Equal, "equal"),
397            (ForkBudgetPolicy::Proportional, "proportional"),
398            (ForkBudgetPolicy::Remaining, "remaining"),
399        ];
400
401        for (policy, expected_type) in policies {
402            let json = serde_json::to_value(&policy).unwrap();
403            assert_eq!(json["type"], expected_type);
404        }
405
406        let fixed = ForkBudgetPolicy::Fixed(5000);
407        let json = serde_json::to_value(&fixed).unwrap();
408        assert_eq!(json["type"], "fixed");
409        // Adjacently-tagged: {"type": "fixed", "value": 5000}
410        assert_eq!(json["value"], 5000);
411
412        // Roundtrip
413        let parsed: ForkBudgetPolicy = serde_json::from_value(json).unwrap();
414        match parsed {
415            ForkBudgetPolicy::Fixed(tokens) => assert_eq!(tokens, 5000),
416            _ => unreachable!("Wrong variant"),
417        }
418    }
419
420    #[test]
421    fn test_tool_access_policy_serialization() {
422        let inherit = ToolAccessPolicy::Inherit;
423        let json = serde_json::to_value(&inherit).unwrap();
424        assert_eq!(json["type"], "inherit");
425
426        let allow =
427            ToolAccessPolicy::AllowList(vec!["read_file".to_string(), "write_file".to_string()]);
428        let json = serde_json::to_value(&allow).unwrap();
429        assert_eq!(json["type"], "allow_list");
430        // Adjacently-tagged: {"type": "allow_list", "value": [...]}
431        assert!(json["value"].is_array());
432
433        let deny = ToolAccessPolicy::DenyList(vec!["dangerous_tool".to_string()]);
434        let json = serde_json::to_value(&deny).unwrap();
435        assert_eq!(json["type"], "deny_list");
436        assert!(json["value"].is_array());
437
438        // Roundtrip
439        let parsed: ToolAccessPolicy = serde_json::from_value(json).unwrap();
440        match parsed {
441            ToolAccessPolicy::DenyList(tools) => {
442                assert_eq!(tools.len(), 1);
443                assert_eq!(tools[0], "dangerous_tool");
444            }
445            _ => unreachable!("Wrong variant"),
446        }
447    }
448
449    #[test]
450    fn test_op_event_serialization() {
451        let events = vec![
452            OpEvent::Started {
453                id: OperationId::new(),
454                kind: WorkKind::ToolCall,
455            },
456            OpEvent::Progress {
457                id: OperationId::new(),
458                message: "50% complete".to_string(),
459                percent: Some(0.5),
460            },
461            OpEvent::Completed {
462                id: OperationId::new(),
463                result: OperationResult {
464                    id: OperationId::new(),
465                    content: "result".to_string(),
466                    is_error: false,
467                    duration_ms: 100,
468                    tokens_used: 50,
469                },
470            },
471            OpEvent::Failed {
472                id: OperationId::new(),
473                error: "timeout".to_string(),
474            },
475            OpEvent::Cancelled {
476                id: OperationId::new(),
477            },
478        ];
479
480        for event in events {
481            let json = serde_json::to_value(&event).unwrap();
482            assert!(json.get("type").is_some());
483
484            // Roundtrip
485            let _: OpEvent = serde_json::from_value(json).unwrap();
486        }
487    }
488
489    #[test]
490    fn test_concurrency_limits_default() {
491        let limits = ConcurrencyLimits::default();
492        assert_eq!(limits.max_depth, 3);
493        assert_eq!(limits.max_concurrent_ops, 32);
494        assert_eq!(limits.max_concurrent_agents, 8);
495        assert_eq!(limits.max_children_per_agent, 5);
496    }
497
498    #[test]
499    fn session_effect_grant_manage_mob_serde_round_trip() {
500        let effect = SessionEffect::GrantManageMob {
501            mob_id: "test-mob".into(),
502        };
503        let json = serde_json::to_value(&effect).unwrap();
504        let parsed: SessionEffect = serde_json::from_value(json).unwrap();
505        assert_eq!(effect, parsed);
506    }
507
508    #[test]
509    fn tool_dispatch_outcome_with_session_effects() {
510        let result = crate::types::ToolResult::new("t1".into(), "ok".into(), false);
511        let outcome = ToolDispatchOutcome {
512            result,
513            async_ops: vec![],
514            session_effects: vec![SessionEffect::GrantManageMob {
515                mob_id: "mob-1".into(),
516            }],
517        };
518        assert_eq!(outcome.session_effects.len(), 1);
519    }
520
521    #[test]
522    fn tool_dispatch_outcome_sync_result_has_empty_effects() {
523        let result = crate::types::ToolResult::new("t1".into(), "ok".into(), false);
524        let outcome = ToolDispatchOutcome::sync_result(result);
525        assert!(outcome.session_effects.is_empty());
526    }
527}