1use std::sync::{Arc, Mutex};
2
3use tokio::sync::mpsc;
4
5use crate::plugin::{
6 PluginSession, SessionGraphService, SessionLifecycleService, SessionStateService,
7};
8use crate::{
9 PreparedToolCall, SessionEvent, ToolCallRecord, ToolCatalog, ToolFailure, ToolFailureClass,
10 ToolProvider, ToolResult,
11};
12
13#[derive(Clone, Default)]
14pub(crate) struct CheckpointMessageBuffer {
15 queue: Arc<Mutex<Vec<crate::PluginMessage>>>,
16}
17
18impl CheckpointMessageBuffer {
19 pub(crate) fn enqueue(&self, messages: Vec<crate::PluginMessage>) -> Result<(), String> {
20 let mut queue = self
21 .queue
22 .lock()
23 .map_err(|_| "checkpoint message buffer poisoned".to_string())?;
24 queue.extend(messages);
25 Ok(())
26 }
27
28 pub(crate) fn drain(&self) -> Result<Vec<crate::PluginMessage>, String> {
29 let mut queue = self
30 .queue
31 .lock()
32 .map_err(|_| "checkpoint message buffer poisoned".to_string())?;
33 Ok(queue.drain(..).collect())
34 }
35}
36
37#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
38pub struct ToolTriggerEffectOutcome {
39 pub source_type: String,
40 pub source_key: String,
41 pub occurrence_id: String,
42 #[serde(default)]
43 pub payload: serde_json::Value,
44 pub idempotency_key: String,
45 #[serde(default, skip_serializing_if = "Option::is_none")]
46 pub source: Option<serde_json::Value>,
47 pub started_process_ids: Vec<String>,
48}
49
50#[derive(Clone, Default)]
51pub(crate) struct ToolTriggerOutcomeBuffer {
52 queue: Arc<Mutex<Vec<ToolTriggerEffectOutcome>>>,
53}
54
55impl ToolTriggerOutcomeBuffer {
56 pub(crate) fn enqueue(&self, outcome: ToolTriggerEffectOutcome) -> Result<(), String> {
57 let mut queue = self
58 .queue
59 .lock()
60 .map_err(|_| "tool trigger outcome buffer poisoned".to_string())?;
61 queue.push(outcome);
62 Ok(())
63 }
64
65 pub(crate) fn drain(&self) -> Result<Vec<ToolTriggerEffectOutcome>, String> {
66 let mut queue = self
67 .queue
68 .lock()
69 .map_err(|_| "tool trigger outcome buffer poisoned".to_string())?;
70 Ok(queue.drain(..).collect())
71 }
72}
73
74#[derive(Clone)]
75pub struct ToolDispatchContext<'run> {
76 pub plugins: Arc<PluginSession>,
77 pub tools: Arc<dyn ToolProvider>,
78 pub tool_catalog: Arc<ToolCatalog>,
79 pub sessions: Arc<dyn SessionStateService>,
80 pub session_lifecycle: Arc<dyn SessionLifecycleService>,
81 pub session_graph: Arc<dyn SessionGraphService>,
82 pub processes: Arc<dyn crate::ProcessService>,
83 pub process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
84 pub trigger_router: Option<crate::TriggerRouter>,
85 pub(crate) effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
86 pub(crate) direct_completions: crate::DirectCompletionClient<'run>,
87 pub(crate) parent_invocation: Option<crate::RuntimeInvocation>,
88 pub(crate) execution_env_spec: crate::ProcessExecutionEnvSpec,
89 pub session_id: String,
90 pub agent_frame_id: crate::AgentFrameId,
91 pub event_tx: mpsc::Sender<SessionEvent>,
92 pub(crate) checkpoint_messages: CheckpointMessageBuffer,
93 pub(crate) trigger_outcomes: ToolTriggerOutcomeBuffer,
94 pub attachment_store: Arc<dyn crate::AttachmentStore>,
95 pub turn_context: crate::TurnContext,
96 pub clock: Arc<dyn crate::Clock>,
97}
98
99impl<'run> ToolDispatchContext<'run> {
100 pub fn process_scope(&self) -> crate::ProcessOpScope<'_> {
101 crate::ProcessOpScope::new(self.effect_controller.scoped())
102 .with_parent_invocation(self.parent_invocation.clone())
103 .with_agent_frame_id(Some(self.agent_frame_id.clone()))
104 }
105}
106
107#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
108pub(crate) struct ToolDispatchOutcome {
109 pub record: ToolCallRecord,
110}
111
112#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
113pub(crate) struct PendingToolDispatchOutcome {
114 pub tool_name: String,
115 pub args: serde_json::Value,
116 pub key: crate::AwaitEventKey,
117 pub pending: crate::PendingCompletion,
118 pub duration_ms: u64,
119}
120
121#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
122#[serde(tag = "status", rename_all = "snake_case")]
123pub(crate) enum ToolCallLaunch {
124 Done(ToolDispatchOutcome),
125 Pending(PendingToolDispatchOutcome),
126}
127
128pub(crate) enum ToolPreparationOutcome {
129 Prepared(PreparedToolCall),
130 Completed(Box<ToolDispatchOutcome>),
131}
132
133pub(super) fn completed_preparation(outcome: ToolDispatchOutcome) -> ToolPreparationOutcome {
134 ToolPreparationOutcome::Completed(Box::new(outcome))
135}
136pub(super) fn outcome(
137 tool_name: String,
138 args: serde_json::Value,
139 result: ToolResult,
140 duration_ms: u64,
141) -> ToolDispatchOutcome {
142 let record = ToolCallRecord {
143 call_id: None,
144 tool: tool_name,
145 args,
146 output: result.into_done_output().unwrap_or_else(|_| {
147 crate::ToolCallOutput::failure(crate::ToolFailure::runtime(
148 crate::ToolFailureClass::Internal,
149 "pending_tool_not_finalized",
150 "pending tool result reached a completed-output projection path",
151 ))
152 }),
153 duration_ms,
154 };
155 ToolDispatchOutcome { record }
156}
157
158pub(super) fn launch_done(outcome: ToolDispatchOutcome) -> ToolCallLaunch {
159 ToolCallLaunch::Done(outcome)
160}
161
162pub(super) fn runtime_failure(
163 class: ToolFailureClass,
164 code: impl Into<String>,
165 message: impl Into<String>,
166) -> ToolResult {
167 ToolResult::failure(ToolFailure::runtime(class, code, message))
168}