Skip to main content

lash_core/plugin/
session_obj.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::Arc;
3
4use sha2::{Digest, Sha256};
5use tokio::task::JoinSet;
6
7use super::*;
8use crate::session_model::plugin_message_to_message;
9
10async fn collect_owned_async<C, O, H, F>(
11    hooks: &[RegisteredHook<H>],
12    ctx: C,
13    invoke: F,
14) -> Result<Vec<PluginOwned<O>>, PluginError>
15where
16    C: Clone,
17    F: Fn(&H, C) -> PluginFuture<Vec<O>>,
18{
19    let mut out = Vec::new();
20    for registered in hooks {
21        for value in invoke(&registered.hook, ctx.clone()).await? {
22            out.push(PluginOwned {
23                plugin_id: registered.plugin_id.clone(),
24                value,
25            });
26        }
27    }
28    Ok(out)
29}
30
31fn collect_owned_sync<C, O, H, F>(
32    hooks: &[RegisteredHook<H>],
33    ctx: C,
34    invoke: F,
35) -> Result<Vec<PluginOwned<O>>, PluginError>
36where
37    C: Clone,
38    F: Fn(&H, C) -> Result<O, PluginError>,
39{
40    let mut out = Vec::new();
41    for registered in hooks {
42        out.push(PluginOwned {
43            plugin_id: registered.plugin_id.clone(),
44            value: invoke(&registered.hook, ctx.clone())?,
45        });
46    }
47    Ok(out)
48}
49
50fn merge_string_array(
51    obj: &mut serde_json::Map<String, serde_json::Value>,
52    key: &str,
53    values: Vec<String>,
54) {
55    let mut existing = obj
56        .remove(key)
57        .and_then(|value| value.as_array().cloned())
58        .unwrap_or_default()
59        .into_iter()
60        .filter_map(|value| value.as_str().map(str::to_string))
61        .collect::<BTreeSet<_>>();
62    existing.extend(
63        values
64            .into_iter()
65            .map(|value| value.trim().to_string())
66            .filter(|value| !value.is_empty()),
67    );
68    if !existing.is_empty() {
69        obj.insert(key.to_string(), serde_json::json!(existing));
70    }
71}
72
73fn apply_tool_discovery_contributions(
74    catalog: &mut [serde_json::Value],
75    contributions: impl IntoIterator<Item = ToolDiscoveryContribution>,
76) {
77    let mut by_name = BTreeMap::new();
78    for (idx, tool) in catalog.iter().enumerate() {
79        if let Some(name) = tool.get("name").and_then(serde_json::Value::as_str) {
80            by_name.insert(name.to_string(), idx);
81        }
82    }
83
84    for contribution in contributions {
85        for patch in contribution.tools {
86            let Some(idx) = by_name.get(&patch.tool_name).copied() else {
87                continue;
88            };
89            let Some(obj) = catalog[idx].as_object_mut() else {
90                continue;
91            };
92            if let Some(namespace) = patch
93                .namespace
94                .map(|value| value.trim().to_string())
95                .filter(|value| !value.is_empty())
96            {
97                obj.insert("namespace".to_string(), serde_json::json!(namespace));
98            }
99            merge_string_array(obj, "aliases", patch.aliases);
100        }
101    }
102}
103
104fn append_plugin_messages(
105    messages: &mut crate::MessageSequence,
106    plugin_messages: &[PluginMessage],
107) {
108    let new_messages = plugin_messages
109        .iter()
110        .filter(|message| matches!(message.role, MessageRole::User | MessageRole::System))
111        .map(plugin_message_to_message)
112        .collect::<Vec<_>>();
113    if !new_messages.is_empty() {
114        messages.extend(new_messages);
115    }
116}
117
118struct EmptySnapshotReader;
119
120impl SnapshotReader for EmptySnapshotReader {
121    fn read_blob(&self, _name: &str) -> Option<&[u8]> {
122        None
123    }
124}
125
126pub struct PluginSession {
127    pub(super) host: PluginHost,
128    pub(super) session_id: String,
129    pub(super) execution_mode: ExecutionMode,
130    pub(super) plugins: Vec<Arc<dyn SessionPlugin>>,
131    pub(super) tools: Arc<dyn ToolProvider>,
132    pub(super) tool_registry: Arc<crate::ToolRegistry>,
133    pub(super) tool_surface_overlay: ToolSurfaceContribution,
134    pub(super) tool_access: SessionToolAccess,
135    pub(super) subagent: Option<SubagentSessionAuthority>,
136    pub(super) prompt_contributors: Vec<RegisteredHook<PromptContributor>>,
137    pub(super) tool_surface_contributors: Vec<RegisteredHook<ToolSurfaceContributor>>,
138    pub(super) tool_discovery_contributors: Vec<RegisteredHook<ToolDiscoveryContributor>>,
139    pub(super) before_turn_hooks: Vec<RegisteredHook<BeforeTurnHook>>,
140    pub(super) before_tool_call_hooks: Vec<RegisteredHook<BeforeToolCallHook>>,
141    pub(super) after_tool_call_hooks: Vec<RegisteredHook<AfterToolCallHook>>,
142    pub(super) after_turn_hooks: Vec<RegisteredHook<AfterTurnHook>>,
143    pub(super) checkpoint_hooks: Vec<RegisteredHook<CheckpointHook>>,
144    pub(super) assistant_stream_hooks: Vec<RegisteredHook<AssistantStreamHook>>,
145    pub(super) assistant_response_hooks: Vec<RegisteredHook<AssistantResponseHook>>,
146    pub(super) tool_result_projector: Option<RegisteredExclusiveHook<ToolResultProjector>>,
147    pub(super) runtime_event_hooks: Vec<PluginRuntimeEventHook>,
148    pub(super) session_config_mutators: Vec<SessionConfigMutator>,
149    pub(super) plugin_actions: BTreeMap<String, RegisteredPluginAction>,
150    pub(super) monitor_specs: Vec<PluginOwned<crate::MonitorSpec>>,
151    pub(super) turn_context_transforms: Vec<Arc<dyn TurnContextTransform>>,
152    pub(super) history_rewriters: Vec<Arc<dyn HistoryRewriter>>,
153    pub(super) mode_session: Arc<dyn ModeSessionPlugin>,
154    pub(super) mode_native_tools: Vec<Arc<dyn ModeNativeToolsPlugin>>,
155    pub(super) mode_protocol_driver: Option<Arc<dyn ModeProtocolDriverPlugin>>,
156}
157impl PluginSession {
158    pub fn session_id(&self) -> &str {
159        &self.session_id
160    }
161
162    pub fn execution_mode(&self) -> ExecutionMode {
163        self.execution_mode.clone()
164    }
165
166    pub fn tool_access(&self) -> &SessionToolAccess {
167        &self.tool_access
168    }
169
170    pub fn subagent_authority(&self) -> Option<&SubagentSessionAuthority> {
171        self.subagent.as_ref()
172    }
173
174    pub fn host(&self) -> &PluginHost {
175        &self.host
176    }
177
178    pub fn tools(&self) -> Arc<dyn ToolProvider> {
179        Arc::clone(&self.tools)
180    }
181
182    pub fn tool_registry(&self) -> Arc<crate::ToolRegistry> {
183        Arc::clone(&self.tool_registry)
184    }
185
186    pub(crate) fn mode_session(&self) -> &Arc<dyn ModeSessionPlugin> {
187        &self.mode_session
188    }
189
190    pub(crate) fn mode_native_tools(&self) -> &[Arc<dyn ModeNativeToolsPlugin>] {
191        &self.mode_native_tools
192    }
193
194    pub(crate) fn mode_native_tool_manifests(&self) -> Vec<ToolManifest> {
195        self.mode_native_tools
196            .iter()
197            .flat_map(|provider| provider.tool_manifests())
198            .collect()
199    }
200
201    /// Plugin-registered protocol driver for this session, if any plugin
202    /// claimed the singleton slot. When `None`, callers fall back to
203    /// `lash_sansio::build_mode_preamble` (hardcoded Standard/RLM).
204    pub fn mode_protocol_driver(&self) -> Option<Arc<dyn ModeProtocolDriverPlugin>> {
205        self.mode_protocol_driver.clone()
206    }
207
208    pub fn tool_surface(&self, session_id: &str, mode: ExecutionMode) -> Arc<crate::ToolSurface> {
209        let mut tools = self.tools.tool_manifests();
210        let contract_provider = Arc::clone(&self.tools);
211        let native_contract_providers = self.mode_native_tools.to_vec();
212        let resolve_contract: lash_sansio::ToolContractResolver = Arc::new(move |name: &str| {
213            contract_provider.resolve_contract(name).or_else(|| {
214                native_contract_providers
215                    .iter()
216                    .find_map(|provider| provider.resolve_contract(name))
217            })
218        });
219        if mode == self.execution_mode {
220            let native_tools = self.mode_native_tool_manifests();
221            tools.extend(native_tools);
222        }
223        match self.resolve_tool_surface(ToolSurfaceContext {
224            session_id: session_id.to_string(),
225            mode: mode.clone(),
226            tools,
227            resolve_contract: Some(Arc::clone(&resolve_contract)),
228            tool_access: self.tool_access.clone(),
229            subagent: self.subagent.clone(),
230        }) {
231            Ok(surface) => Arc::new(surface),
232            Err(err) => {
233                tracing::warn!("failed to resolve tool surface: {err}");
234                let mut fallback_tools = self.tools.tool_manifests();
235                if mode == self.execution_mode {
236                    let native_tools = self.mode_native_tool_manifests();
237                    fallback_tools.extend(native_tools);
238                }
239                Arc::new(crate::build_tool_surface(crate::ToolSurfaceBuildInput {
240                    tools: fallback_tools,
241                    mode,
242                    resolve_contract: Some(resolve_contract),
243                    contributions: Vec::new(),
244                }))
245            }
246        }
247    }
248
249    pub fn tool_catalog(&self, session_id: &str, mode: ExecutionMode) -> Vec<serde_json::Value> {
250        let surface = self.tool_surface(session_id, mode.clone());
251        let mut catalog =
252            crate::tool_registry::project_tool_catalog(surface.searchable_tools_iter().cloned());
253        let contributions = collect_owned_sync(
254            &self.tool_discovery_contributors,
255            ToolDiscoveryContext {
256                session_id: session_id.to_string(),
257                mode,
258                catalog: catalog.clone(),
259            },
260            |hook, ctx| hook(ctx),
261        )
262        .unwrap_or_else(|err| {
263            tracing::warn!("failed to resolve tool discovery metadata: {err}");
264            Vec::new()
265        });
266        apply_tool_discovery_contributions(
267            &mut catalog,
268            contributions.into_iter().map(|owned| owned.value),
269        );
270        catalog
271    }
272
273    pub fn resolve_tool_surface(
274        &self,
275        ctx: ToolSurfaceContext,
276    ) -> Result<crate::ToolSurface, PluginError> {
277        let mut contributions = collect_owned_sync(
278            &self.tool_surface_contributors,
279            ToolSurfaceContext {
280                session_id: ctx.session_id.clone(),
281                mode: ctx.mode.clone(),
282                tools: ctx.tools.clone(),
283                resolve_contract: ctx.resolve_contract.clone(),
284                tool_access: ctx.tool_access.clone(),
285                subagent: ctx.subagent.clone(),
286            },
287            |hook, ctx| hook(ctx),
288        )?
289        .into_iter()
290        .map(|owned| owned.value)
291        .collect::<Vec<_>>();
292        contributions.push(self.tool_surface_overlay.clone());
293        let (tools, resolve_contract) = if ctx.tool_access.tools.is_empty() {
294            (ctx.tools, ctx.resolve_contract)
295        } else {
296            let contracts = ctx
297                .tool_access
298                .tools
299                .iter()
300                .map(|tool| (tool.name.clone(), Arc::new(tool.contract())))
301                .collect::<BTreeMap<_, _>>();
302            (
303                ctx.tool_access
304                    .tools
305                    .iter()
306                    .map(|tool| tool.manifest())
307                    .collect(),
308                Some(Arc::new(move |name: &str| contracts.get(name).cloned())
309                    as lash_sansio::ToolContractResolver),
310            )
311        };
312        let authority_hidden_tools = tools
313            .iter()
314            .filter(|tool| ctx.tool_access.hides(&tool.name))
315            .map(|tool| tool.name.clone())
316            .collect::<BTreeSet<_>>();
317        if !authority_hidden_tools.is_empty() {
318            contributions.push(ToolSurfaceContribution {
319                overrides: authority_hidden_tools
320                    .into_iter()
321                    .map(|tool_name| ToolSurfaceOverride {
322                        tool_name,
323                        availability: Some(crate::ToolAvailability::Off),
324                    })
325                    .collect(),
326                ..Default::default()
327            });
328        }
329        Ok(crate::build_tool_surface(crate::ToolSurfaceBuildInput {
330            tools,
331            mode: ctx.mode,
332            resolve_contract,
333            contributions,
334        }))
335    }
336
337    pub fn plugin_actions(&self) -> Vec<PluginActionDef> {
338        self.plugin_actions
339            .values()
340            .map(|op| op.def.clone())
341            .collect()
342    }
343
344    pub fn monitor_specs(&self) -> &[PluginOwned<crate::MonitorSpec>] {
345        self.monitor_specs.as_slice()
346    }
347
348    pub fn has_assistant_stream_hooks(&self) -> bool {
349        !self.assistant_stream_hooks.is_empty()
350    }
351
352    /// Chain registered turn-context transforms, piping each one's output
353    /// into the next in priority order.
354    pub async fn prepare_turn_context(
355        &self,
356        ctx: &TurnTransformContext,
357        input: crate::session_model::context::PreparedContext,
358    ) -> Result<crate::session_model::context::PreparedContext, HistoryError> {
359        let mut current = input;
360        for transform in &self.turn_context_transforms {
361            current = transform.transform(ctx, current).await?;
362        }
363        Ok(current)
364    }
365
366    /// Chain registered history rewriters, skipping any that opt out of
367    /// the current trigger via `accepts()`.
368    pub async fn rewrite_history(
369        &self,
370        ctx: &RewriteContext,
371        input: HistoryState,
372    ) -> Result<HistoryState, HistoryError> {
373        let mut current = input;
374        for rewriter in &self.history_rewriters {
375            if !rewriter.accepts(&ctx.trigger) {
376                continue;
377            }
378            current = rewriter.rewrite(ctx, current).await?;
379        }
380        Ok(current)
381    }
382
383    pub async fn collect_prompt_contributions(
384        &self,
385        ctx: PromptHookContext,
386    ) -> Result<Vec<PromptContribution>, PluginError> {
387        let mut out = collect_owned_async(&self.prompt_contributors, ctx, |hook, ctx| hook(ctx))
388            .await?
389            .into_iter()
390            .map(|owned| owned.value)
391            .collect::<Vec<_>>();
392        let mut seen = BTreeSet::new();
393        out.retain(|contribution| {
394            seen.insert((
395                format!("{:?}", contribution.slot),
396                contribution.priority,
397                contribution.content.trim().to_string(),
398            ))
399        });
400        out.sort_by(|a, b| {
401            format!("{:?}", a.slot)
402                .cmp(&format!("{:?}", b.slot))
403                .then(a.priority.cmp(&b.priority))
404        });
405        Ok(out)
406    }
407
408    async fn apply_turn_directives(
409        &self,
410        directives: Vec<PluginOwned<PluginDirective>>,
411        mut messages: crate::MessageSequence,
412        host: Arc<dyn TurnHookHost>,
413        allow_abort: bool,
414        invalid_context: &'static str,
415    ) -> Result<TurnPreparation, PluginError> {
416        let mut events = Vec::new();
417        let mut abort = None;
418
419        for emitted in directives {
420            match emitted.value {
421                PluginDirective::AbortTurn { code, message } => {
422                    if !allow_abort {
423                        return Err(PluginError::Session(invalid_context.to_string()));
424                    }
425                    abort = Some(PluginAbort { code, message });
426                }
427                PluginDirective::EnqueueMessages {
428                    messages: plugin_messages,
429                } => append_plugin_messages(&mut messages, &plugin_messages),
430                PluginDirective::CreateSession { request } => {
431                    host.create_session(*request)
432                        .await
433                        .map_err(|err| PluginError::Session(err.to_string()))?;
434                }
435                PluginDirective::HandoffSession { .. } => {
436                    return Err(PluginError::Session(invalid_context.to_string()));
437                }
438                PluginDirective::EmitEvents { events: surface } => {
439                    events.extend(crate::plugin::plugin_surface_session_events(
440                        &emitted.plugin_id,
441                        surface,
442                    ));
443                }
444                PluginDirective::EmitTrace {
445                    name,
446                    payload,
447                    context,
448                } => {
449                    host.emit_trace_event(
450                        *context,
451                        lash_trace::TraceEvent::Custom {
452                            name: format!("plugin.{}.{}", emitted.plugin_id, name),
453                            payload,
454                        },
455                    )
456                    .await?;
457                }
458                PluginDirective::ReplaceToolArgs { .. }
459                | PluginDirective::ShortCircuitTool { .. } => {
460                    return Err(PluginError::Session(invalid_context.to_string()));
461                }
462            }
463        }
464
465        Ok(TurnPreparation {
466            messages,
467            events,
468            abort,
469        })
470    }
471
472    pub async fn prepare_turn(
473        &self,
474        request: PrepareTurnRequest,
475    ) -> Result<TurnPreparation, PluginError> {
476        let PrepareTurnRequest {
477            session_id,
478            state,
479            messages,
480            host,
481            turn_context,
482        } = request;
483        let directives = self
484            .before_turn(TurnHookContext {
485                session_id,
486                state,
487                host: host.clone(),
488                turn_context,
489            })
490            .await?;
491        self.apply_turn_directives(
492            directives,
493            messages,
494            host,
495            true,
496            "tool directives are not valid in before_turn",
497        )
498        .await
499    }
500
501    pub async fn apply_checkpoint(
502        &self,
503        ctx: CheckpointHookContext,
504    ) -> Result<CheckpointApplication, PluginError> {
505        let directives = self.at_checkpoint(ctx.clone()).await?;
506        let mut messages = Vec::new();
507        let mut events = Vec::new();
508        let mut abort = None;
509
510        for emitted in directives {
511            match emitted.value {
512                PluginDirective::EnqueueMessages { messages: queued } => messages.extend(queued),
513                PluginDirective::CreateSession { request } => {
514                    ctx.host
515                        .create_session(*request)
516                        .await
517                        .map_err(|err| PluginError::Session(err.to_string()))?;
518                }
519                PluginDirective::HandoffSession { .. } => {
520                    return Err(PluginError::Session(
521                        "checkpoint hooks do not support session handoff".to_string(),
522                    ));
523                }
524                PluginDirective::AbortTurn { code, message } => {
525                    abort = Some(PluginAbort { code, message });
526                }
527                PluginDirective::EmitEvents { events: surface } => {
528                    events.extend(crate::plugin::plugin_surface_session_events(
529                        &emitted.plugin_id,
530                        surface,
531                    ));
532                }
533                PluginDirective::EmitTrace {
534                    name,
535                    payload,
536                    context,
537                } => {
538                    ctx.host
539                        .emit_trace_event(
540                            *context,
541                            lash_trace::TraceEvent::Custom {
542                                name: format!("plugin.{}.{}", emitted.plugin_id, name),
543                                payload,
544                            },
545                        )
546                        .await?;
547                }
548                PluginDirective::ReplaceToolArgs { .. }
549                | PluginDirective::ShortCircuitTool { .. } => {
550                    return Err(PluginError::Session(
551                        "checkpoint hooks only support abort, message enqueue, session creation, events, and trace events"
552                            .to_string(),
553                    ));
554                }
555            }
556        }
557
558        Ok(CheckpointApplication {
559            messages,
560            events,
561            abort,
562        })
563    }
564
565    pub async fn before_turn(
566        &self,
567        ctx: TurnHookContext,
568    ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
569        collect_owned_async(&self.before_turn_hooks, ctx, |hook, ctx| hook(ctx)).await
570    }
571
572    pub async fn before_tool_call(
573        &self,
574        ctx: ToolCallHookContext,
575    ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
576        collect_owned_async(&self.before_tool_call_hooks, ctx, |hook, ctx| hook(ctx)).await
577    }
578
579    pub async fn after_tool_call(
580        &self,
581        ctx: ToolResultHookContext,
582    ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
583        collect_owned_async(&self.after_tool_call_hooks, ctx, |hook, ctx| hook(ctx)).await
584    }
585
586    pub async fn after_turn(
587        &self,
588        ctx: TurnResultHookContext,
589    ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
590        collect_owned_async(&self.after_turn_hooks, ctx, |hook, ctx| hook(ctx)).await
591    }
592
593    pub async fn at_checkpoint(
594        &self,
595        ctx: CheckpointHookContext,
596    ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
597        collect_owned_async(&self.checkpoint_hooks, ctx, |hook, ctx| hook(ctx)).await
598    }
599
600    pub async fn transform_assistant_stream(
601        &self,
602        session_id: &str,
603        chunk: String,
604    ) -> Result<Vec<PluginOwned<AssistantStreamTransform>>, PluginError> {
605        let mut current = chunk;
606        let mut transforms = Vec::new();
607        for registered in &self.assistant_stream_hooks {
608            let transform = (registered.hook)(AssistantStreamHookContext {
609                session_id: session_id.to_string(),
610                chunk: current.clone(),
611            })
612            .await?;
613            current = transform.chunk.clone();
614            transforms.push(PluginOwned {
615                plugin_id: registered.plugin_id.clone(),
616                value: transform,
617            });
618        }
619        Ok(transforms)
620    }
621
622    pub async fn transform_assistant_response(
623        &self,
624        session_id: &str,
625        response: crate::llm::types::LlmResponse,
626    ) -> Result<Vec<PluginOwned<AssistantResponseTransform>>, PluginError> {
627        let mut current = response;
628        let mut transforms = Vec::new();
629        for registered in &self.assistant_response_hooks {
630            let transform = (registered.hook)(AssistantResponseHookContext {
631                session_id: session_id.to_string(),
632                response: current.clone(),
633            })
634            .await?;
635            current = transform.response.clone();
636            transforms.push(PluginOwned {
637                plugin_id: registered.plugin_id.clone(),
638                value: transform,
639            });
640        }
641        Ok(transforms)
642    }
643
644    pub async fn project_tool_result(
645        &self,
646        ctx: ToolResultProjectionContext,
647    ) -> Result<crate::ModelToolReturn, PluginError> {
648        let Some(projector) = &self.tool_result_projector else {
649            return Ok(crate::ModelToolReturn::from_output(
650                ctx.call_id.clone(),
651                ctx.tool_name.clone(),
652                &ctx.output,
653            ));
654        };
655        (projector.hook)(ctx).await
656    }
657
658    pub async fn emit_runtime_event(&self, event: PluginRuntimeEvent) {
659        let mut tasks = JoinSet::new();
660        for hook in &self.runtime_event_hooks {
661            let hook = Arc::clone(hook);
662            let event = event.clone();
663            tasks.spawn(async move { hook(event).await });
664        }
665
666        while let Some(result) = tasks.join_next().await {
667            match result {
668                Ok(Ok(())) => {}
669                Ok(Err(err)) => tracing::warn!("plugin runtime event hook failed: {err}"),
670                Err(err) => tracing::warn!("plugin runtime event hook task failed: {err}"),
671            }
672        }
673    }
674
675    pub fn has_runtime_event_hooks(&self) -> bool {
676        !self.runtime_event_hooks.is_empty()
677    }
678
679    pub async fn mutate_session_config(
680        &self,
681        ctx: SessionConfigChangedContext,
682        mut policy: SessionPolicy,
683    ) -> SessionPolicy {
684        for hook in &self.session_config_mutators {
685            match hook(ctx.clone(), policy.clone()).await {
686                Ok(next_policy) => policy = next_policy,
687                Err(err) => tracing::warn!("plugin config mutator failed: {err}"),
688            }
689        }
690        policy
691    }
692
693    pub async fn finalize_turn(
694        &self,
695        mut turn: AssembledTurn,
696        host: Arc<dyn ToolHookHost>,
697    ) -> Result<TurnFinalization, PluginError> {
698        let session_id = turn.state.session_id.clone();
699        let directives = if self.after_turn_hooks.is_empty() {
700            Vec::new()
701        } else {
702            self.after_turn(TurnResultHookContext {
703                session_id: session_id.clone(),
704                turn: Arc::new(crate::plugin::TurnResultSummary::from_assembled(&turn)),
705                host: host.clone(),
706            })
707            .await?
708        };
709        let mut events = Vec::new();
710        let mut updated_messages: Option<crate::MessageSequence> = None;
711        for emitted in directives {
712            match emitted.value {
713                PluginDirective::AbortTurn { .. } => {
714                    return Err(PluginError::Session(
715                        "only message enqueue and session creation are valid in after_turn"
716                            .to_string(),
717                    ));
718                }
719                PluginDirective::EnqueueMessages {
720                    messages: plugin_messages,
721                } => {
722                    let messages = updated_messages.get_or_insert_with(|| {
723                        crate::MessageSequence::from_base(
724                            turn.state.read_view().messages().to_vec().into(),
725                        )
726                    });
727                    append_plugin_messages(messages, &plugin_messages);
728                }
729                PluginDirective::CreateSession { request } => {
730                    host.create_session(*request)
731                        .await
732                        .map_err(|err| PluginError::Session(err.to_string()))?;
733                }
734                PluginDirective::HandoffSession { .. } => {
735                    return Err(PluginError::Session(
736                        "after_turn hooks do not support session handoff".to_string(),
737                    ));
738                }
739                PluginDirective::EmitEvents { events: surface } => {
740                    events.extend(crate::plugin::plugin_surface_session_events(
741                        &emitted.plugin_id,
742                        surface,
743                    ));
744                }
745                PluginDirective::EmitTrace {
746                    name,
747                    payload,
748                    context,
749                } => {
750                    host.emit_trace_event(
751                        *context,
752                        lash_trace::TraceEvent::Custom {
753                            name: format!("plugin.{}.{}", emitted.plugin_id, name),
754                            payload,
755                        },
756                    )
757                    .await?;
758                }
759                PluginDirective::ReplaceToolArgs { .. }
760                | PluginDirective::ShortCircuitTool { .. } => {
761                    return Err(PluginError::Session(
762                        "only message enqueue, session creation, events, and trace events are valid in after_turn"
763                            .to_string(),
764                    ));
765                }
766            }
767        }
768        if let Some(messages) = updated_messages.as_ref() {
769            let tool_calls = turn.state.read_view().tool_calls().to_vec();
770            turn.state
771                .replace_active_read_state(messages.as_slice(), &tool_calls);
772        }
773
774        if self.has_runtime_event_hooks() {
775            self.emit_runtime_event(PluginRuntimeEvent::TurnCommitted(Arc::new(turn.clone())))
776                .await;
777        }
778
779        Ok(TurnFinalization { turn, events })
780    }
781
782    pub fn snapshot(&self) -> Result<PluginSessionSnapshot, PluginError> {
783        let mut plugins = BTreeMap::new();
784        for plugin in &self.plugins {
785            let mut writer = InMemorySnapshotWriter::default();
786            let meta = plugin.snapshot(&mut writer)?;
787            plugins.insert(
788                plugin.id().to_string(),
789                PluginSnapshotEntry {
790                    meta,
791                    artifacts: writer.finish(),
792                },
793            );
794        }
795        Ok(PluginSessionSnapshot { plugins })
796    }
797
798    pub fn snapshot_is_current(&self, previous: Option<&PluginSessionSnapshot>) -> bool {
799        let Some(previous) = previous else {
800            return false;
801        };
802        if previous.plugins.len() != self.plugins.len() {
803            return false;
804        }
805        for plugin in &self.plugins {
806            let Some(entry) = previous.plugins.get(plugin.id()) else {
807                return false;
808            };
809            if entry.meta.plugin_version != plugin.version()
810                || entry.meta.revision != plugin.snapshot_revision()
811            {
812                return false;
813            }
814        }
815        true
816    }
817
818    pub fn snapshot_revision_fingerprint(&self) -> u64 {
819        let mut hasher = Sha256::new();
820        for plugin in &self.plugins {
821            hasher.update(plugin.id().as_bytes());
822            hasher.update([0]);
823            hasher.update(plugin.version().as_bytes());
824            hasher.update([0]);
825            hasher.update(plugin.snapshot_revision().to_le_bytes());
826            hasher.update([0xff]);
827        }
828        let digest = hasher.finalize();
829        u64::from_le_bytes(digest[..8].try_into().expect("digest prefix"))
830    }
831
832    pub fn restore(&self, snapshot: &PluginSessionSnapshot) -> Result<(), PluginError> {
833        for plugin in &self.plugins {
834            if let Some(entry) = snapshot.plugins.get(plugin.id()) {
835                let reader = InMemorySnapshotReader { entry };
836                plugin.restore(&entry.meta, &reader)?;
837            } else {
838                plugin.restore(
839                    &PluginSnapshotMeta {
840                        plugin_id: plugin.id().to_string(),
841                        plugin_version: plugin.version().to_string(),
842                        revision: plugin.snapshot_revision(),
843                        state: None,
844                    },
845                    &EmptySnapshotReader,
846                )?;
847            }
848        }
849        Ok(())
850    }
851
852    pub fn fork_for_session(
853        &self,
854        session_id: impl Into<String>,
855        execution_mode: ExecutionMode,
856        standard_context_approach: Option<crate::StandardContextApproach>,
857    ) -> Result<Arc<PluginSession>, PluginError> {
858        let snapshot = self.snapshot()?;
859        self.host.build_session_with_surface(
860            session_id,
861            execution_mode,
862            standard_context_approach,
863            Some(&snapshot),
864            self.tool_surface_overlay.clone(),
865            Some(self.tool_registry.export_state()),
866        )
867    }
868
869    pub fn fork_for_child_session(
870        &self,
871        session_id: impl Into<String>,
872        parent_session_id: Option<String>,
873        execution_mode: ExecutionMode,
874        standard_context_approach: Option<crate::StandardContextApproach>,
875        authority: super::SessionAuthorityContext,
876    ) -> Result<Arc<PluginSession>, PluginError> {
877        let snapshot = self.snapshot()?;
878        self.host.build_session_with_parent_and_surface(
879            session_id,
880            parent_session_id,
881            execution_mode,
882            standard_context_approach,
883            Some(&snapshot),
884            self.tool_surface_overlay.clone(),
885            Some(self.tool_registry.export_state()),
886            authority,
887        )
888    }
889
890    pub fn fork_for_session_with_tool_surface(
891        &self,
892        session_id: impl Into<String>,
893        execution_mode: ExecutionMode,
894        standard_context_approach: Option<crate::StandardContextApproach>,
895        tool_surface_overlay: ToolSurfaceContribution,
896    ) -> Result<Arc<PluginSession>, PluginError> {
897        let snapshot = self.snapshot()?;
898        self.host.build_session_with_surface(
899            session_id,
900            execution_mode,
901            standard_context_approach,
902            Some(&snapshot),
903            tool_surface_overlay,
904            Some(self.tool_registry.export_state()),
905        )
906    }
907
908    pub async fn invoke_plugin_action(
909        &self,
910        name: &str,
911        args: serde_json::Value,
912        session_id: Option<String>,
913        default_to_current_session: bool,
914        host: Arc<dyn PluginActionHost>,
915    ) -> Result<ToolResult, PluginActionInvokeError> {
916        let Some(op) = self.plugin_actions.get(name).cloned() else {
917            return Err(PluginActionInvokeError::Unknown(name.to_string()));
918        };
919
920        let effective_session = session_id.or_else(|| {
921            if default_to_current_session && !self.session_id.is_empty() {
922                Some(self.session_id.clone())
923            } else {
924                None
925            }
926        });
927
928        match (op.def.session_param, effective_session.as_ref()) {
929            (SessionParam::Required, None) => {
930                return Err(PluginActionInvokeError::MissingSession(name.to_string()));
931            }
932            (SessionParam::Forbidden, Some(_)) => {
933                return Err(PluginActionInvokeError::UnexpectedSession(name.to_string()));
934            }
935            _ => {}
936        }
937
938        Ok((op.handler)(
939            PluginActionContext {
940                session_id: effective_session,
941                host,
942            },
943            args,
944        )
945        .await)
946    }
947
948    pub async fn call_plugin_action<Op: PluginAction>(
949        &self,
950        args: Op::Args,
951        session_id: Option<String>,
952        default_to_current_session: bool,
953        host: Arc<dyn PluginActionHost>,
954    ) -> Result<Op::Output, PluginError> {
955        let args = serde_json::to_value(args)
956            .map_err(|err| PluginError::Invoke(format!("invalid {} args: {err}", Op::NAME)))?;
957        let result = self
958            .invoke_plugin_action(Op::NAME, args, session_id, default_to_current_session, host)
959            .await
960            .map_err(|err| PluginError::Invoke(err.to_string()))?;
961        if !result.is_success() {
962            return Err(PluginError::Invoke(format!(
963                "{} failed: {}",
964                Op::NAME,
965                result.value_for_projection()
966            )));
967        }
968        serde_json::from_value(result.into_value_for_projection())
969            .map_err(|err| PluginError::Invoke(format!("invalid {} output: {err}", Op::NAME)))
970    }
971}