Skip to main content

lash_core/tool_dispatch/
context.rs

1use std::sync::{Arc, Mutex};
2
3use tokio::sync::mpsc;
4
5use crate::plugin::{
6    PluginSession, SessionGraphService, SessionLifecycleService, SessionStateService,
7};
8use crate::{
9    PreparedToolCall, SessionEvent, ToolCallRecord, ToolFailure, ToolFailureClass, ToolProvider,
10    ToolResult, ToolSurface,
11};
12
13#[derive(Clone, Default)]
14pub(crate) struct CheckpointMessageBuffer {
15    queue: Arc<Mutex<Vec<crate::PluginMessage>>>,
16}
17
18impl CheckpointMessageBuffer {
19    pub(crate) fn enqueue(&self, messages: Vec<crate::PluginMessage>) -> Result<(), String> {
20        let mut queue = self
21            .queue
22            .lock()
23            .map_err(|_| "checkpoint message buffer poisoned".to_string())?;
24        queue.extend(messages);
25        Ok(())
26    }
27
28    pub(crate) fn drain(&self) -> Result<Vec<crate::PluginMessage>, String> {
29        let mut queue = self
30            .queue
31            .lock()
32            .map_err(|_| "checkpoint message buffer poisoned".to_string())?;
33        Ok(queue.drain(..).collect())
34    }
35}
36
37#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
38pub struct ToolHostEventEffectOutcome {
39    pub resource_type: String,
40    pub alias: String,
41    pub event: String,
42    pub source_type: String,
43    pub source_key: String,
44    pub occurrence_id: String,
45    #[serde(default)]
46    pub payload: serde_json::Value,
47    pub started_process_ids: Vec<String>,
48}
49
50#[derive(Clone, Default)]
51pub(crate) struct ToolHostEventOutcomeBuffer {
52    queue: Arc<Mutex<Vec<ToolHostEventEffectOutcome>>>,
53}
54
55impl ToolHostEventOutcomeBuffer {
56    pub(crate) fn enqueue(&self, outcome: ToolHostEventEffectOutcome) -> Result<(), String> {
57        let mut queue = self
58            .queue
59            .lock()
60            .map_err(|_| "tool host event outcome buffer poisoned".to_string())?;
61        queue.push(outcome);
62        Ok(())
63    }
64
65    pub(crate) fn drain(&self) -> Result<Vec<ToolHostEventEffectOutcome>, String> {
66        let mut queue = self
67            .queue
68            .lock()
69            .map_err(|_| "tool host event outcome buffer poisoned".to_string())?;
70        Ok(queue.drain(..).collect())
71    }
72}
73
74#[derive(Clone)]
75pub struct ToolDispatchContext<'run> {
76    pub plugins: Arc<PluginSession>,
77    pub tools: Arc<dyn ToolProvider>,
78    pub surface: Arc<ToolSurface>,
79    pub sessions: Arc<dyn SessionStateService>,
80    pub session_lifecycle: Arc<dyn SessionLifecycleService>,
81    pub session_graph: Arc<dyn SessionGraphService>,
82    pub processes: Arc<dyn crate::ProcessService>,
83    pub process_cancel_ability: Arc<dyn crate::ProcessCancelAbility>,
84    pub host_event_router: Option<crate::HostEventRouter>,
85    pub(crate) effect_controller: crate::runtime::RuntimeEffectControllerHandle<'run>,
86    pub(crate) direct_completions: crate::DirectCompletionClient<'run>,
87    pub(crate) parent_invocation: Option<crate::RuntimeInvocation>,
88    pub(crate) execution_env_spec: crate::ProcessExecutionEnvSpec,
89    pub session_id: String,
90    pub agent_frame_id: crate::AgentFrameId,
91    pub event_tx: mpsc::Sender<SessionEvent>,
92    pub(crate) checkpoint_messages: CheckpointMessageBuffer,
93    pub(crate) host_event_outcomes: ToolHostEventOutcomeBuffer,
94    pub attachment_store: Arc<dyn crate::AttachmentStore>,
95    pub turn_context: crate::TurnContext,
96}
97
98impl<'run> ToolDispatchContext<'run> {
99    pub fn process_scope(&self) -> crate::ProcessOpScope<'_> {
100        crate::ProcessOpScope::new(self.effect_controller.scoped())
101            .with_parent_invocation(self.parent_invocation.clone())
102            .with_agent_frame_id(Some(self.agent_frame_id.clone()))
103    }
104}
105
106#[derive(Clone)]
107pub(crate) struct ToolDispatchOutcome {
108    pub record: ToolCallRecord,
109}
110
111pub(crate) enum ToolPreparationOutcome {
112    Prepared(PreparedToolCall),
113    Completed(Box<ToolDispatchOutcome>),
114}
115
116pub(super) fn completed_preparation(outcome: ToolDispatchOutcome) -> ToolPreparationOutcome {
117    ToolPreparationOutcome::Completed(Box::new(outcome))
118}
119pub(super) fn outcome(
120    tool_name: String,
121    args: serde_json::Value,
122    result: ToolResult,
123    duration_ms: u64,
124) -> ToolDispatchOutcome {
125    let record = ToolCallRecord {
126        call_id: None,
127        tool: tool_name,
128        args,
129        output: result.into_output(),
130        duration_ms,
131    };
132    ToolDispatchOutcome { record }
133}
134
135pub(super) fn runtime_failure(
136    class: ToolFailureClass,
137    code: impl Into<String>,
138    message: impl Into<String>,
139) -> ToolResult {
140    ToolResult::failure(ToolFailure::runtime(class, code, message))
141}