1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::Arc;
3
4use sha2::{Digest, Sha256};
5use tokio::task::JoinSet;
6
7use super::*;
8use crate::session_model::plugin_message_to_message;
9
10async fn collect_owned_async<C, O, H, F>(
11 hooks: &[RegisteredHook<H>],
12 ctx: C,
13 invoke: F,
14) -> Result<Vec<PluginOwned<O>>, PluginError>
15where
16 C: Clone,
17 F: Fn(&H, C) -> PluginFuture<Vec<O>>,
18{
19 let mut out = Vec::new();
20 for registered in hooks {
21 for value in invoke(®istered.hook, ctx.clone()).await? {
22 out.push(PluginOwned {
23 plugin_id: registered.plugin_id.clone(),
24 value,
25 });
26 }
27 }
28 Ok(out)
29}
30
31fn collect_owned_sync<C, O, H, F>(
32 hooks: &[RegisteredHook<H>],
33 ctx: C,
34 invoke: F,
35) -> Result<Vec<PluginOwned<O>>, PluginError>
36where
37 C: Clone,
38 F: Fn(&H, C) -> Result<O, PluginError>,
39{
40 let mut out = Vec::new();
41 for registered in hooks {
42 out.push(PluginOwned {
43 plugin_id: registered.plugin_id.clone(),
44 value: invoke(®istered.hook, ctx.clone())?,
45 });
46 }
47 Ok(out)
48}
49
50fn merge_string_array(
51 obj: &mut serde_json::Map<String, serde_json::Value>,
52 key: &str,
53 values: Vec<String>,
54) {
55 let mut existing = obj
56 .remove(key)
57 .and_then(|value| value.as_array().cloned())
58 .unwrap_or_default()
59 .into_iter()
60 .filter_map(|value| value.as_str().map(str::to_string))
61 .collect::<BTreeSet<_>>();
62 existing.extend(
63 values
64 .into_iter()
65 .map(|value| value.trim().to_string())
66 .filter(|value| !value.is_empty()),
67 );
68 if !existing.is_empty() {
69 obj.insert(key.to_string(), serde_json::json!(existing));
70 }
71}
72
73fn apply_tool_discovery_contributions(
74 catalog: &mut [serde_json::Value],
75 contributions: impl IntoIterator<Item = ToolDiscoveryContribution>,
76) {
77 let mut by_name = BTreeMap::new();
78 for (idx, tool) in catalog.iter().enumerate() {
79 if let Some(name) = tool.get("name").and_then(serde_json::Value::as_str) {
80 by_name.insert(name.to_string(), idx);
81 }
82 }
83
84 for contribution in contributions {
85 for patch in contribution.tools {
86 let Some(idx) = by_name.get(&patch.tool_name).copied() else {
87 continue;
88 };
89 let Some(obj) = catalog[idx].as_object_mut() else {
90 continue;
91 };
92 if let Some(namespace) = patch
93 .namespace
94 .map(|value| value.trim().to_string())
95 .filter(|value| !value.is_empty())
96 {
97 obj.insert("namespace".to_string(), serde_json::json!(namespace));
98 }
99 merge_string_array(obj, "aliases", patch.aliases);
100 }
101 }
102}
103
104fn append_plugin_messages(
105 messages: &mut crate::MessageSequence,
106 plugin_messages: &[PluginMessage],
107) {
108 let new_messages = plugin_messages
109 .iter()
110 .filter(|message| matches!(message.role, MessageRole::User | MessageRole::System))
111 .map(plugin_message_to_message)
112 .collect::<Vec<_>>();
113 if !new_messages.is_empty() {
114 messages.extend(new_messages);
115 }
116}
117
118struct EmptySnapshotReader;
119
120impl SnapshotReader for EmptySnapshotReader {
121 fn read_blob(&self, _name: &str) -> Option<&[u8]> {
122 None
123 }
124}
125
126pub struct PluginSession {
127 pub(super) host: PluginHost,
128 pub(super) session_id: String,
129 pub(super) execution_mode: ExecutionMode,
130 pub(super) plugins: Vec<Arc<dyn SessionPlugin>>,
131 pub(super) tools: Arc<dyn ToolProvider>,
132 pub(super) tool_registry: Arc<crate::ToolRegistry>,
133 pub(super) tool_surface_overlay: ToolSurfaceContribution,
134 pub(super) tool_access: SessionToolAccess,
135 pub(super) subagent: Option<SubagentSessionAuthority>,
136 pub(super) prompt_contributors: Vec<RegisteredHook<PromptContributor>>,
137 pub(super) tool_surface_contributors: Vec<RegisteredHook<ToolSurfaceContributor>>,
138 pub(super) tool_discovery_contributors: Vec<RegisteredHook<ToolDiscoveryContributor>>,
139 pub(super) before_turn_hooks: Vec<RegisteredHook<BeforeTurnHook>>,
140 pub(super) before_tool_call_hooks: Vec<RegisteredHook<BeforeToolCallHook>>,
141 pub(super) after_tool_call_hooks: Vec<RegisteredHook<AfterToolCallHook>>,
142 pub(super) after_turn_hooks: Vec<RegisteredHook<AfterTurnHook>>,
143 pub(super) checkpoint_hooks: Vec<RegisteredHook<CheckpointHook>>,
144 pub(super) assistant_stream_hooks: Vec<RegisteredHook<AssistantStreamHook>>,
145 pub(super) assistant_response_hooks: Vec<RegisteredHook<AssistantResponseHook>>,
146 pub(super) tool_result_projector: Option<RegisteredExclusiveHook<ToolResultProjector>>,
147 pub(super) runtime_event_hooks: Vec<PluginRuntimeEventHook>,
148 pub(super) session_config_mutators: Vec<SessionConfigMutator>,
149 pub(super) plugin_actions: BTreeMap<String, RegisteredPluginAction>,
150 pub(super) monitor_specs: Vec<PluginOwned<crate::MonitorSpec>>,
151 pub(super) turn_context_transforms: Vec<Arc<dyn TurnContextTransform>>,
152 pub(super) history_rewriters: Vec<Arc<dyn HistoryRewriter>>,
153 pub(super) mode_session: Arc<dyn ModeSessionPlugin>,
154 pub(super) mode_native_tools: Vec<Arc<dyn ModeNativeToolsPlugin>>,
155 pub(super) mode_protocol_driver: Option<Arc<dyn ModeProtocolDriverPlugin>>,
156}
157impl PluginSession {
158 pub fn session_id(&self) -> &str {
159 &self.session_id
160 }
161
162 pub fn execution_mode(&self) -> ExecutionMode {
163 self.execution_mode.clone()
164 }
165
166 pub fn tool_access(&self) -> &SessionToolAccess {
167 &self.tool_access
168 }
169
170 pub fn subagent_authority(&self) -> Option<&SubagentSessionAuthority> {
171 self.subagent.as_ref()
172 }
173
174 pub fn host(&self) -> &PluginHost {
175 &self.host
176 }
177
178 pub fn tools(&self) -> Arc<dyn ToolProvider> {
179 Arc::clone(&self.tools)
180 }
181
182 pub fn tool_registry(&self) -> Arc<crate::ToolRegistry> {
183 Arc::clone(&self.tool_registry)
184 }
185
186 pub(crate) fn mode_session(&self) -> &Arc<dyn ModeSessionPlugin> {
187 &self.mode_session
188 }
189
190 pub(crate) fn mode_native_tools(&self) -> &[Arc<dyn ModeNativeToolsPlugin>] {
191 &self.mode_native_tools
192 }
193
194 pub(crate) fn mode_native_tool_manifests(&self) -> Vec<ToolManifest> {
195 self.mode_native_tools
196 .iter()
197 .flat_map(|provider| provider.tool_manifests())
198 .collect()
199 }
200
201 pub fn mode_protocol_driver(&self) -> Option<Arc<dyn ModeProtocolDriverPlugin>> {
205 self.mode_protocol_driver.clone()
206 }
207
208 pub fn tool_surface(&self, session_id: &str, mode: ExecutionMode) -> Arc<crate::ToolSurface> {
209 let mut tools = self.tools.tool_manifests();
210 let contract_provider = Arc::clone(&self.tools);
211 let native_contract_providers = self.mode_native_tools.to_vec();
212 let resolve_contract: lash_sansio::ToolContractResolver = Arc::new(move |name: &str| {
213 contract_provider.resolve_contract(name).or_else(|| {
214 native_contract_providers
215 .iter()
216 .find_map(|provider| provider.resolve_contract(name))
217 })
218 });
219 if mode == self.execution_mode {
220 let native_tools = self.mode_native_tool_manifests();
221 tools.extend(native_tools);
222 }
223 match self.resolve_tool_surface(ToolSurfaceContext {
224 session_id: session_id.to_string(),
225 mode: mode.clone(),
226 tools,
227 resolve_contract: Some(Arc::clone(&resolve_contract)),
228 tool_access: self.tool_access.clone(),
229 subagent: self.subagent.clone(),
230 }) {
231 Ok(surface) => Arc::new(surface),
232 Err(err) => {
233 tracing::warn!("failed to resolve tool surface: {err}");
234 let mut fallback_tools = self.tools.tool_manifests();
235 if mode == self.execution_mode {
236 let native_tools = self.mode_native_tool_manifests();
237 fallback_tools.extend(native_tools);
238 }
239 Arc::new(crate::build_tool_surface(crate::ToolSurfaceBuildInput {
240 tools: fallback_tools,
241 mode,
242 resolve_contract: Some(resolve_contract),
243 contributions: Vec::new(),
244 }))
245 }
246 }
247 }
248
249 pub fn tool_catalog(&self, session_id: &str, mode: ExecutionMode) -> Vec<serde_json::Value> {
250 let surface = self.tool_surface(session_id, mode.clone());
251 let mut catalog =
252 crate::tool_registry::project_tool_catalog(surface.searchable_tools_iter().cloned());
253 let contributions = collect_owned_sync(
254 &self.tool_discovery_contributors,
255 ToolDiscoveryContext {
256 session_id: session_id.to_string(),
257 mode,
258 catalog: catalog.clone(),
259 },
260 |hook, ctx| hook(ctx),
261 )
262 .unwrap_or_else(|err| {
263 tracing::warn!("failed to resolve tool discovery metadata: {err}");
264 Vec::new()
265 });
266 apply_tool_discovery_contributions(
267 &mut catalog,
268 contributions.into_iter().map(|owned| owned.value),
269 );
270 catalog
271 }
272
273 pub fn resolve_tool_surface(
274 &self,
275 ctx: ToolSurfaceContext,
276 ) -> Result<crate::ToolSurface, PluginError> {
277 let mut contributions = collect_owned_sync(
278 &self.tool_surface_contributors,
279 ToolSurfaceContext {
280 session_id: ctx.session_id.clone(),
281 mode: ctx.mode.clone(),
282 tools: ctx.tools.clone(),
283 resolve_contract: ctx.resolve_contract.clone(),
284 tool_access: ctx.tool_access.clone(),
285 subagent: ctx.subagent.clone(),
286 },
287 |hook, ctx| hook(ctx),
288 )?
289 .into_iter()
290 .map(|owned| owned.value)
291 .collect::<Vec<_>>();
292 contributions.push(self.tool_surface_overlay.clone());
293 let (tools, resolve_contract) = if ctx.tool_access.tools.is_empty() {
294 (ctx.tools, ctx.resolve_contract)
295 } else {
296 let contracts = ctx
297 .tool_access
298 .tools
299 .iter()
300 .map(|tool| (tool.name.clone(), Arc::new(tool.contract())))
301 .collect::<BTreeMap<_, _>>();
302 (
303 ctx.tool_access
304 .tools
305 .iter()
306 .map(|tool| tool.manifest())
307 .collect(),
308 Some(Arc::new(move |name: &str| contracts.get(name).cloned())
309 as lash_sansio::ToolContractResolver),
310 )
311 };
312 let authority_hidden_tools = tools
313 .iter()
314 .filter(|tool| ctx.tool_access.hides(&tool.name))
315 .map(|tool| tool.name.clone())
316 .collect::<BTreeSet<_>>();
317 if !authority_hidden_tools.is_empty() {
318 contributions.push(ToolSurfaceContribution {
319 overrides: authority_hidden_tools
320 .into_iter()
321 .map(|tool_name| ToolSurfaceOverride {
322 tool_name,
323 availability: Some(crate::ToolAvailability::Off),
324 })
325 .collect(),
326 ..Default::default()
327 });
328 }
329 Ok(crate::build_tool_surface(crate::ToolSurfaceBuildInput {
330 tools,
331 mode: ctx.mode,
332 resolve_contract,
333 contributions,
334 }))
335 }
336
337 pub fn plugin_actions(&self) -> Vec<PluginActionDef> {
338 self.plugin_actions
339 .values()
340 .map(|op| op.def.clone())
341 .collect()
342 }
343
344 pub fn monitor_specs(&self) -> &[PluginOwned<crate::MonitorSpec>] {
345 self.monitor_specs.as_slice()
346 }
347
348 pub fn has_assistant_stream_hooks(&self) -> bool {
349 !self.assistant_stream_hooks.is_empty()
350 }
351
352 pub async fn prepare_turn_context(
355 &self,
356 ctx: &TurnTransformContext,
357 input: crate::session_model::context::PreparedContext,
358 ) -> Result<crate::session_model::context::PreparedContext, HistoryError> {
359 let mut current = input;
360 for transform in &self.turn_context_transforms {
361 current = transform.transform(ctx, current).await?;
362 }
363 Ok(current)
364 }
365
366 pub async fn rewrite_history(
369 &self,
370 ctx: &RewriteContext,
371 input: HistoryState,
372 ) -> Result<HistoryState, HistoryError> {
373 let mut current = input;
374 for rewriter in &self.history_rewriters {
375 if !rewriter.accepts(&ctx.trigger) {
376 continue;
377 }
378 current = rewriter.rewrite(ctx, current).await?;
379 }
380 Ok(current)
381 }
382
383 pub async fn collect_prompt_contributions(
384 &self,
385 ctx: PromptHookContext,
386 ) -> Result<Vec<PromptContribution>, PluginError> {
387 let mut out = collect_owned_async(&self.prompt_contributors, ctx, |hook, ctx| hook(ctx))
388 .await?
389 .into_iter()
390 .map(|owned| owned.value)
391 .collect::<Vec<_>>();
392 let mut seen = BTreeSet::new();
393 out.retain(|contribution| {
394 seen.insert((
395 format!("{:?}", contribution.slot),
396 contribution.priority,
397 contribution.content.trim().to_string(),
398 ))
399 });
400 out.sort_by(|a, b| {
401 format!("{:?}", a.slot)
402 .cmp(&format!("{:?}", b.slot))
403 .then(a.priority.cmp(&b.priority))
404 });
405 Ok(out)
406 }
407
408 async fn apply_turn_directives(
409 &self,
410 directives: Vec<PluginOwned<PluginDirective>>,
411 mut messages: crate::MessageSequence,
412 host: Arc<dyn TurnHookHost>,
413 allow_abort: bool,
414 invalid_context: &'static str,
415 ) -> Result<TurnPreparation, PluginError> {
416 let mut events = Vec::new();
417 let mut abort = None;
418
419 for emitted in directives {
420 match emitted.value {
421 PluginDirective::AbortTurn { code, message } => {
422 if !allow_abort {
423 return Err(PluginError::Session(invalid_context.to_string()));
424 }
425 abort = Some(PluginAbort { code, message });
426 }
427 PluginDirective::EnqueueMessages {
428 messages: plugin_messages,
429 } => append_plugin_messages(&mut messages, &plugin_messages),
430 PluginDirective::CreateSession { request } => {
431 host.create_session(*request)
432 .await
433 .map_err(|err| PluginError::Session(err.to_string()))?;
434 }
435 PluginDirective::HandoffSession { .. } => {
436 return Err(PluginError::Session(invalid_context.to_string()));
437 }
438 PluginDirective::EmitEvents { events: surface } => {
439 events.extend(crate::plugin::plugin_surface_session_events(
440 &emitted.plugin_id,
441 surface,
442 ));
443 }
444 PluginDirective::EmitTrace {
445 name,
446 payload,
447 context,
448 } => {
449 host.emit_trace_event(
450 *context,
451 lash_trace::TraceEvent::Custom {
452 name: format!("plugin.{}.{}", emitted.plugin_id, name),
453 payload,
454 },
455 )
456 .await?;
457 }
458 PluginDirective::ReplaceToolArgs { .. }
459 | PluginDirective::ShortCircuitTool { .. } => {
460 return Err(PluginError::Session(invalid_context.to_string()));
461 }
462 }
463 }
464
465 Ok(TurnPreparation {
466 messages,
467 events,
468 abort,
469 })
470 }
471
472 pub async fn prepare_turn(
473 &self,
474 request: PrepareTurnRequest,
475 ) -> Result<TurnPreparation, PluginError> {
476 let PrepareTurnRequest {
477 session_id,
478 state,
479 messages,
480 host,
481 turn_context,
482 } = request;
483 let directives = self
484 .before_turn(TurnHookContext {
485 session_id,
486 state,
487 host: host.clone(),
488 turn_context,
489 })
490 .await?;
491 self.apply_turn_directives(
492 directives,
493 messages,
494 host,
495 true,
496 "tool directives are not valid in before_turn",
497 )
498 .await
499 }
500
501 pub async fn apply_checkpoint(
502 &self,
503 ctx: CheckpointHookContext,
504 ) -> Result<CheckpointApplication, PluginError> {
505 let directives = self.at_checkpoint(ctx.clone()).await?;
506 let mut messages = Vec::new();
507 let mut events = Vec::new();
508 let mut abort = None;
509
510 for emitted in directives {
511 match emitted.value {
512 PluginDirective::EnqueueMessages { messages: queued } => messages.extend(queued),
513 PluginDirective::CreateSession { request } => {
514 ctx.host
515 .create_session(*request)
516 .await
517 .map_err(|err| PluginError::Session(err.to_string()))?;
518 }
519 PluginDirective::HandoffSession { .. } => {
520 return Err(PluginError::Session(
521 "checkpoint hooks do not support session handoff".to_string(),
522 ));
523 }
524 PluginDirective::AbortTurn { code, message } => {
525 abort = Some(PluginAbort { code, message });
526 }
527 PluginDirective::EmitEvents { events: surface } => {
528 events.extend(crate::plugin::plugin_surface_session_events(
529 &emitted.plugin_id,
530 surface,
531 ));
532 }
533 PluginDirective::EmitTrace {
534 name,
535 payload,
536 context,
537 } => {
538 ctx.host
539 .emit_trace_event(
540 *context,
541 lash_trace::TraceEvent::Custom {
542 name: format!("plugin.{}.{}", emitted.plugin_id, name),
543 payload,
544 },
545 )
546 .await?;
547 }
548 PluginDirective::ReplaceToolArgs { .. }
549 | PluginDirective::ShortCircuitTool { .. } => {
550 return Err(PluginError::Session(
551 "checkpoint hooks only support abort, message enqueue, session creation, events, and trace events"
552 .to_string(),
553 ));
554 }
555 }
556 }
557
558 Ok(CheckpointApplication {
559 messages,
560 events,
561 abort,
562 })
563 }
564
565 pub async fn before_turn(
566 &self,
567 ctx: TurnHookContext,
568 ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
569 collect_owned_async(&self.before_turn_hooks, ctx, |hook, ctx| hook(ctx)).await
570 }
571
572 pub async fn before_tool_call(
573 &self,
574 ctx: ToolCallHookContext,
575 ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
576 collect_owned_async(&self.before_tool_call_hooks, ctx, |hook, ctx| hook(ctx)).await
577 }
578
579 pub async fn after_tool_call(
580 &self,
581 ctx: ToolResultHookContext,
582 ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
583 collect_owned_async(&self.after_tool_call_hooks, ctx, |hook, ctx| hook(ctx)).await
584 }
585
586 pub async fn after_turn(
587 &self,
588 ctx: TurnResultHookContext,
589 ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
590 collect_owned_async(&self.after_turn_hooks, ctx, |hook, ctx| hook(ctx)).await
591 }
592
593 pub async fn at_checkpoint(
594 &self,
595 ctx: CheckpointHookContext,
596 ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
597 collect_owned_async(&self.checkpoint_hooks, ctx, |hook, ctx| hook(ctx)).await
598 }
599
600 pub async fn transform_assistant_stream(
601 &self,
602 session_id: &str,
603 chunk: String,
604 ) -> Result<Vec<PluginOwned<AssistantStreamTransform>>, PluginError> {
605 let mut current = chunk;
606 let mut transforms = Vec::new();
607 for registered in &self.assistant_stream_hooks {
608 let transform = (registered.hook)(AssistantStreamHookContext {
609 session_id: session_id.to_string(),
610 chunk: current.clone(),
611 })
612 .await?;
613 current = transform.chunk.clone();
614 transforms.push(PluginOwned {
615 plugin_id: registered.plugin_id.clone(),
616 value: transform,
617 });
618 }
619 Ok(transforms)
620 }
621
622 pub async fn transform_assistant_response(
623 &self,
624 session_id: &str,
625 response: crate::llm::types::LlmResponse,
626 ) -> Result<Vec<PluginOwned<AssistantResponseTransform>>, PluginError> {
627 let mut current = response;
628 let mut transforms = Vec::new();
629 for registered in &self.assistant_response_hooks {
630 let transform = (registered.hook)(AssistantResponseHookContext {
631 session_id: session_id.to_string(),
632 response: current.clone(),
633 })
634 .await?;
635 current = transform.response.clone();
636 transforms.push(PluginOwned {
637 plugin_id: registered.plugin_id.clone(),
638 value: transform,
639 });
640 }
641 Ok(transforms)
642 }
643
644 pub async fn project_tool_result(
645 &self,
646 ctx: ToolResultProjectionContext,
647 ) -> Result<crate::ModelToolReturn, PluginError> {
648 let Some(projector) = &self.tool_result_projector else {
649 return Ok(crate::ModelToolReturn::from_output(
650 ctx.call_id.clone(),
651 ctx.tool_name.clone(),
652 &ctx.output,
653 ));
654 };
655 (projector.hook)(ctx).await
656 }
657
658 pub async fn emit_runtime_event(&self, event: PluginRuntimeEvent) {
659 let mut tasks = JoinSet::new();
660 for hook in &self.runtime_event_hooks {
661 let hook = Arc::clone(hook);
662 let event = event.clone();
663 tasks.spawn(async move { hook(event).await });
664 }
665
666 while let Some(result) = tasks.join_next().await {
667 match result {
668 Ok(Ok(())) => {}
669 Ok(Err(err)) => tracing::warn!("plugin runtime event hook failed: {err}"),
670 Err(err) => tracing::warn!("plugin runtime event hook task failed: {err}"),
671 }
672 }
673 }
674
675 pub fn has_runtime_event_hooks(&self) -> bool {
676 !self.runtime_event_hooks.is_empty()
677 }
678
679 pub async fn mutate_session_config(
680 &self,
681 ctx: SessionConfigChangedContext,
682 mut policy: SessionPolicy,
683 ) -> SessionPolicy {
684 for hook in &self.session_config_mutators {
685 match hook(ctx.clone(), policy.clone()).await {
686 Ok(next_policy) => policy = next_policy,
687 Err(err) => tracing::warn!("plugin config mutator failed: {err}"),
688 }
689 }
690 policy
691 }
692
693 pub async fn finalize_turn(
694 &self,
695 mut turn: AssembledTurn,
696 host: Arc<dyn ToolHookHost>,
697 ) -> Result<TurnFinalization, PluginError> {
698 let session_id = turn.state.session_id.clone();
699 let directives = if self.after_turn_hooks.is_empty() {
700 Vec::new()
701 } else {
702 self.after_turn(TurnResultHookContext {
703 session_id: session_id.clone(),
704 turn: Arc::new(crate::plugin::TurnResultSummary::from_assembled(&turn)),
705 host: host.clone(),
706 })
707 .await?
708 };
709 let mut events = Vec::new();
710 let mut updated_messages: Option<crate::MessageSequence> = None;
711 for emitted in directives {
712 match emitted.value {
713 PluginDirective::AbortTurn { .. } => {
714 return Err(PluginError::Session(
715 "only message enqueue and session creation are valid in after_turn"
716 .to_string(),
717 ));
718 }
719 PluginDirective::EnqueueMessages {
720 messages: plugin_messages,
721 } => {
722 let messages = updated_messages.get_or_insert_with(|| {
723 crate::MessageSequence::from_base(
724 turn.state.read_view().messages().to_vec().into(),
725 )
726 });
727 append_plugin_messages(messages, &plugin_messages);
728 }
729 PluginDirective::CreateSession { request } => {
730 host.create_session(*request)
731 .await
732 .map_err(|err| PluginError::Session(err.to_string()))?;
733 }
734 PluginDirective::HandoffSession { .. } => {
735 return Err(PluginError::Session(
736 "after_turn hooks do not support session handoff".to_string(),
737 ));
738 }
739 PluginDirective::EmitEvents { events: surface } => {
740 events.extend(crate::plugin::plugin_surface_session_events(
741 &emitted.plugin_id,
742 surface,
743 ));
744 }
745 PluginDirective::EmitTrace {
746 name,
747 payload,
748 context,
749 } => {
750 host.emit_trace_event(
751 *context,
752 lash_trace::TraceEvent::Custom {
753 name: format!("plugin.{}.{}", emitted.plugin_id, name),
754 payload,
755 },
756 )
757 .await?;
758 }
759 PluginDirective::ReplaceToolArgs { .. }
760 | PluginDirective::ShortCircuitTool { .. } => {
761 return Err(PluginError::Session(
762 "only message enqueue, session creation, events, and trace events are valid in after_turn"
763 .to_string(),
764 ));
765 }
766 }
767 }
768 if let Some(messages) = updated_messages.as_ref() {
769 let tool_calls = turn.state.read_view().tool_calls().to_vec();
770 turn.state
771 .replace_active_read_state(messages.as_slice(), &tool_calls);
772 }
773
774 if self.has_runtime_event_hooks() {
775 self.emit_runtime_event(PluginRuntimeEvent::TurnCommitted(Arc::new(turn.clone())))
776 .await;
777 }
778
779 Ok(TurnFinalization { turn, events })
780 }
781
782 pub fn snapshot(&self) -> Result<PluginSessionSnapshot, PluginError> {
783 let mut plugins = BTreeMap::new();
784 for plugin in &self.plugins {
785 let mut writer = InMemorySnapshotWriter::default();
786 let meta = plugin.snapshot(&mut writer)?;
787 plugins.insert(
788 plugin.id().to_string(),
789 PluginSnapshotEntry {
790 meta,
791 artifacts: writer.finish(),
792 },
793 );
794 }
795 Ok(PluginSessionSnapshot { plugins })
796 }
797
798 pub fn snapshot_is_current(&self, previous: Option<&PluginSessionSnapshot>) -> bool {
799 let Some(previous) = previous else {
800 return false;
801 };
802 if previous.plugins.len() != self.plugins.len() {
803 return false;
804 }
805 for plugin in &self.plugins {
806 let Some(entry) = previous.plugins.get(plugin.id()) else {
807 return false;
808 };
809 if entry.meta.plugin_version != plugin.version()
810 || entry.meta.revision != plugin.snapshot_revision()
811 {
812 return false;
813 }
814 }
815 true
816 }
817
818 pub fn snapshot_revision_fingerprint(&self) -> u64 {
819 let mut hasher = Sha256::new();
820 for plugin in &self.plugins {
821 hasher.update(plugin.id().as_bytes());
822 hasher.update([0]);
823 hasher.update(plugin.version().as_bytes());
824 hasher.update([0]);
825 hasher.update(plugin.snapshot_revision().to_le_bytes());
826 hasher.update([0xff]);
827 }
828 let digest = hasher.finalize();
829 u64::from_le_bytes(digest[..8].try_into().expect("digest prefix"))
830 }
831
832 pub fn restore(&self, snapshot: &PluginSessionSnapshot) -> Result<(), PluginError> {
833 for plugin in &self.plugins {
834 if let Some(entry) = snapshot.plugins.get(plugin.id()) {
835 let reader = InMemorySnapshotReader { entry };
836 plugin.restore(&entry.meta, &reader)?;
837 } else {
838 plugin.restore(
839 &PluginSnapshotMeta {
840 plugin_id: plugin.id().to_string(),
841 plugin_version: plugin.version().to_string(),
842 revision: plugin.snapshot_revision(),
843 state: None,
844 },
845 &EmptySnapshotReader,
846 )?;
847 }
848 }
849 Ok(())
850 }
851
852 pub fn fork_for_session(
853 &self,
854 session_id: impl Into<String>,
855 execution_mode: ExecutionMode,
856 standard_context_approach: Option<crate::StandardContextApproach>,
857 ) -> Result<Arc<PluginSession>, PluginError> {
858 let snapshot = self.snapshot()?;
859 self.host.build_session_with_surface(
860 session_id,
861 execution_mode,
862 standard_context_approach,
863 Some(&snapshot),
864 self.tool_surface_overlay.clone(),
865 Some(self.tool_registry.export_state()),
866 )
867 }
868
869 pub fn fork_for_child_session(
870 &self,
871 session_id: impl Into<String>,
872 parent_session_id: Option<String>,
873 execution_mode: ExecutionMode,
874 standard_context_approach: Option<crate::StandardContextApproach>,
875 authority: super::SessionAuthorityContext,
876 ) -> Result<Arc<PluginSession>, PluginError> {
877 let snapshot = self.snapshot()?;
878 self.host.build_session_with_parent_and_surface(
879 session_id,
880 parent_session_id,
881 execution_mode,
882 standard_context_approach,
883 Some(&snapshot),
884 self.tool_surface_overlay.clone(),
885 Some(self.tool_registry.export_state()),
886 authority,
887 )
888 }
889
890 pub fn fork_for_session_with_tool_surface(
891 &self,
892 session_id: impl Into<String>,
893 execution_mode: ExecutionMode,
894 standard_context_approach: Option<crate::StandardContextApproach>,
895 tool_surface_overlay: ToolSurfaceContribution,
896 ) -> Result<Arc<PluginSession>, PluginError> {
897 let snapshot = self.snapshot()?;
898 self.host.build_session_with_surface(
899 session_id,
900 execution_mode,
901 standard_context_approach,
902 Some(&snapshot),
903 tool_surface_overlay,
904 Some(self.tool_registry.export_state()),
905 )
906 }
907
908 pub async fn invoke_plugin_action(
909 &self,
910 name: &str,
911 args: serde_json::Value,
912 session_id: Option<String>,
913 default_to_current_session: bool,
914 host: Arc<dyn PluginActionHost>,
915 ) -> Result<ToolResult, PluginActionInvokeError> {
916 let Some(op) = self.plugin_actions.get(name).cloned() else {
917 return Err(PluginActionInvokeError::Unknown(name.to_string()));
918 };
919
920 let effective_session = session_id.or_else(|| {
921 if default_to_current_session && !self.session_id.is_empty() {
922 Some(self.session_id.clone())
923 } else {
924 None
925 }
926 });
927
928 match (op.def.session_param, effective_session.as_ref()) {
929 (SessionParam::Required, None) => {
930 return Err(PluginActionInvokeError::MissingSession(name.to_string()));
931 }
932 (SessionParam::Forbidden, Some(_)) => {
933 return Err(PluginActionInvokeError::UnexpectedSession(name.to_string()));
934 }
935 _ => {}
936 }
937
938 Ok((op.handler)(
939 PluginActionContext {
940 session_id: effective_session,
941 host,
942 },
943 args,
944 )
945 .await)
946 }
947
948 pub async fn call_plugin_action<Op: PluginAction>(
949 &self,
950 args: Op::Args,
951 session_id: Option<String>,
952 default_to_current_session: bool,
953 host: Arc<dyn PluginActionHost>,
954 ) -> Result<Op::Output, PluginError> {
955 let args = serde_json::to_value(args)
956 .map_err(|err| PluginError::Invoke(format!("invalid {} args: {err}", Op::NAME)))?;
957 let result = self
958 .invoke_plugin_action(Op::NAME, args, session_id, default_to_current_session, host)
959 .await
960 .map_err(|err| PluginError::Invoke(err.to_string()))?;
961 if !result.is_success() {
962 return Err(PluginError::Invoke(format!(
963 "{} failed: {}",
964 Op::NAME,
965 result.value_for_projection()
966 )));
967 }
968 serde_json::from_value(result.into_value_for_projection())
969 .map_err(|err| PluginError::Invoke(format!("invalid {} output: {err}", Op::NAME)))
970 }
971}