Skip to main content

lash_core/plugin/
session_obj.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::Arc;
3
4use futures_util::stream::{FuturesUnordered, StreamExt};
5use sha2::{Digest, Sha256};
6
7use super::*;
8
9mod directives;
10mod tools;
11
12async fn collect_owned_async<C, O, H, F>(
13    hooks: &[RegisteredHook<H>],
14    ctx: C,
15    hook_kind: &'static str,
16    phase_probe: Option<&Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
17    invoke: F,
18) -> Result<Vec<PluginOwned<O>>, PluginError>
19where
20    C: Clone,
21    F: Fn(&H, C) -> PluginFuture<Vec<O>>,
22{
23    let mut out = Vec::new();
24    for registered in hooks {
25        let phase_name = plugin_hook_phase_name(hook_kind, &registered.plugin_id);
26        if let Some(probe) = phase_probe {
27            probe.begin_named(&phase_name);
28        }
29        let result = invoke(&registered.hook, ctx.clone()).await;
30        if let Some(probe) = phase_probe {
31            probe.end_named(&phase_name);
32        }
33        for value in result? {
34            out.push(PluginOwned {
35                plugin_id: registered.plugin_id.clone(),
36                value,
37            });
38        }
39    }
40    Ok(out)
41}
42
43fn plugin_hook_phase_name(hook_kind: &str, plugin_id: &str) -> String {
44    format!("plugin_hook.{hook_kind}.{plugin_id}")
45}
46
47fn lifecycle_event_hook_kind(event: &PluginLifecycleEvent<'_>) -> &'static str {
48    match event {
49        PluginLifecycleEvent::TurnFinalized(_) => "turn_finalized",
50        PluginLifecycleEvent::TurnPersisted(_) => "turn_persisted",
51        PluginLifecycleEvent::SessionRestored(_) => "session_restored",
52        PluginLifecycleEvent::SessionConfigChanged(_) => "session_config_changed",
53    }
54}
55
56fn collect_owned_sync<C, O, H, F>(
57    hooks: &[RegisteredHook<H>],
58    ctx: C,
59    invoke: F,
60) -> Result<Vec<PluginOwned<O>>, PluginError>
61where
62    C: Clone,
63    F: Fn(&H, C) -> Result<O, PluginError>,
64{
65    let mut out = Vec::new();
66    for registered in hooks {
67        out.push(PluginOwned {
68            plugin_id: registered.plugin_id.clone(),
69            value: invoke(&registered.hook, ctx.clone())?,
70        });
71    }
72    Ok(out)
73}
74
75struct EmptySnapshotReader;
76
77impl SnapshotReader for EmptySnapshotReader {
78    fn read_blob(&self, _name: &str) -> Option<&[u8]> {
79        None
80    }
81}
82
83pub struct PluginSession {
84    pub(super) host: PluginHost,
85    pub(super) session_id: String,
86    pub(super) plugins: Vec<Arc<dyn SessionPlugin>>,
87    pub(super) tools: Arc<dyn ToolProvider>,
88    pub(super) tool_registry: Arc<crate::ToolRegistry>,
89    pub(super) tool_catalog_overlay: ToolCatalogContribution,
90    pub(super) tool_access: SessionToolAccess,
91    pub(super) subagent: Option<SubagentSessionContext>,
92    pub(super) extensions: PluginExtensions,
93    pub(super) triggers: crate::TriggerEventCatalog,
94    pub(super) contributions: PluginContributions,
95}
96impl PluginSession {
97    pub fn session_id(&self) -> &str {
98        &self.session_id
99    }
100
101    pub fn tool_access(&self) -> &SessionToolAccess {
102        &self.tool_access
103    }
104
105    pub fn subagent_context(&self) -> Option<&SubagentSessionContext> {
106        self.subagent.as_ref()
107    }
108
109    pub fn extensions(&self) -> &PluginExtensions {
110        &self.extensions
111    }
112
113    pub fn triggers(&self) -> &crate::TriggerEventCatalog {
114        &self.triggers
115    }
116
117    pub fn host(&self) -> &PluginHost {
118        &self.host
119    }
120
121    pub fn tools(&self) -> Arc<dyn ToolProvider> {
122        Arc::clone(&self.tools)
123    }
124
125    pub fn tool_registry(&self) -> Arc<crate::ToolRegistry> {
126        Arc::clone(&self.tool_registry)
127    }
128
129    pub(crate) fn protocol_session(&self) -> &Arc<dyn ProtocolSessionPlugin> {
130        &self
131            .contributions
132            .protocol_session
133            .as_ref()
134            .expect("plugin session must have a protocol session")
135            .hook
136    }
137
138    pub(crate) fn code_executor(&self) -> Option<Arc<dyn CodeExecutorPlugin>> {
139        self.contributions
140            .code_executor
141            .as_ref()
142            .map(|entry| Arc::clone(&entry.hook))
143    }
144
145    pub(crate) fn assistant_prose_projector(
146        &self,
147    ) -> Option<Arc<dyn AssistantProseProjectorPlugin>> {
148        self.contributions
149            .assistant_prose_projector
150            .as_ref()
151            .map(|entry| Arc::clone(&entry.hook))
152    }
153
154    pub fn protocol_driver(&self) -> Arc<dyn ProtocolDriverPlugin> {
155        self.contributions
156            .protocol_driver
157            .as_ref()
158            .map(|entry| Arc::clone(&entry.hook))
159            .expect("plugin session must have a protocol driver")
160    }
161
162    pub fn plugin_operations(&self) -> Vec<PluginOperationDef> {
163        self.contributions
164            .plugin_queries
165            .values()
166            .map(|op| op.def.clone())
167            .chain(
168                self.contributions
169                    .plugin_commands
170                    .values()
171                    .map(|op| op.def.clone()),
172            )
173            .chain(
174                self.contributions
175                    .plugin_tasks
176                    .values()
177                    .map(|op| op.def.clone()),
178            )
179            .collect()
180    }
181
182    pub fn has_assistant_stream_hooks(&self) -> bool {
183        !self.contributions.assistant_stream_hooks.is_empty()
184    }
185
186    pub fn has_assistant_stream_finished_hooks(&self) -> bool {
187        !self
188            .contributions
189            .assistant_stream_finished_hooks
190            .is_empty()
191    }
192
193    /// Chain registered turn-context transforms, piping each one's output
194    /// into the next in priority order.
195    pub async fn prepare_turn_context(
196        &self,
197        ctx: &TurnTransformContext<'_>,
198        input: crate::session_model::context::PreparedContext,
199        phase_probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
200    ) -> Result<crate::session_model::context::PreparedContext, ContextError> {
201        let mut current = input;
202        for (_, registered) in &self.contributions.turn_context_transforms {
203            let phase_name =
204                plugin_hook_phase_name("context_transform", registered.plugin_id.as_str());
205            if let Some(probe) = phase_probe.as_ref() {
206                probe.begin_named(&phase_name);
207            }
208            let result = registered.hook.transform(ctx, current).await;
209            if let Some(probe) = phase_probe.as_ref() {
210                probe.end_named(&phase_name);
211            }
212            current = result?;
213        }
214        Ok(current)
215    }
216
217    /// Ask registered compactors for seed nodes for a new compaction frame.
218    pub async fn compact_context(
219        &self,
220        ctx: &CompactionContext<'_>,
221    ) -> Result<Option<ContextCompaction>, ContextError> {
222        for (_, registered) in &self.contributions.context_compactors {
223            if let Some(compaction) = registered.hook.compact(ctx).await?
224                && !compaction.is_empty()
225            {
226                return Ok(Some(compaction));
227            }
228        }
229        Ok(None)
230    }
231
232    pub async fn collect_prompt_contributions(
233        &self,
234        ctx: PromptHookContext,
235    ) -> Result<Vec<PromptContribution>, PluginError> {
236        let mut out = collect_owned_async(
237            &self.contributions.prompt_contributors,
238            ctx,
239            "prompt_contributor",
240            None,
241            |hook, ctx| hook(ctx),
242        )
243        .await?
244        .into_iter()
245        .map(|owned| owned.value)
246        .collect::<Vec<_>>();
247        let mut seen = BTreeSet::new();
248        out.retain(|contribution| {
249            seen.insert((
250                format!("{:?}", contribution.slot),
251                contribution.priority,
252                contribution.content.trim().to_string(),
253            ))
254        });
255        out.sort_by(|a, b| {
256            format!("{:?}", a.slot)
257                .cmp(&format!("{:?}", b.slot))
258                .then(a.priority.cmp(&b.priority))
259        });
260        Ok(out)
261    }
262
263    pub async fn before_turn(
264        &self,
265        ctx: TurnHookContext,
266    ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
267        self.before_turn_with_phase_probe(ctx, None).await
268    }
269
270    async fn before_turn_with_phase_probe(
271        &self,
272        ctx: TurnHookContext,
273        phase_probe: Option<&Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
274    ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
275        collect_owned_async(
276            &self.contributions.before_turn_hooks,
277            ctx,
278            "before_turn",
279            phase_probe,
280            |hook, ctx| hook(ctx),
281        )
282        .await
283    }
284
285    pub async fn before_tool_call(
286        &self,
287        ctx: ToolCallHookContext,
288    ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
289        collect_owned_async(
290            &self.contributions.before_tool_call_hooks,
291            ctx,
292            "before_tool_call",
293            None,
294            |hook, ctx| hook(ctx),
295        )
296        .await
297    }
298
299    pub async fn after_tool_call(
300        &self,
301        ctx: ToolResultHookContext,
302    ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
303        collect_owned_async(
304            &self.contributions.after_tool_call_hooks,
305            ctx,
306            "after_tool_call",
307            None,
308            |hook, ctx| hook(ctx),
309        )
310        .await
311    }
312
313    pub async fn after_turn(
314        &self,
315        ctx: TurnResultHookContext,
316    ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
317        self.after_turn_with_phase_probe(ctx, None).await
318    }
319
320    async fn after_turn_with_phase_probe(
321        &self,
322        ctx: TurnResultHookContext,
323        phase_probe: Option<&Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
324    ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
325        collect_owned_async(
326            &self.contributions.after_turn_hooks,
327            ctx,
328            "after_turn",
329            phase_probe,
330            |hook, ctx| hook(ctx),
331        )
332        .await
333    }
334
335    pub async fn at_checkpoint(
336        &self,
337        ctx: CheckpointHookContext,
338    ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
339        collect_owned_async(
340            &self.contributions.checkpoint_hooks,
341            ctx,
342            "checkpoint",
343            None,
344            |hook, ctx| hook(ctx),
345        )
346        .await
347    }
348
349    pub async fn transform_assistant_stream(
350        &self,
351        session_id: &str,
352        chunk: String,
353    ) -> Result<Vec<PluginOwned<AssistantStreamTransform>>, PluginError> {
354        let mut current = chunk;
355        let mut transforms = Vec::new();
356        for registered in &self.contributions.assistant_stream_hooks {
357            let transform = (registered.hook)(AssistantStreamHookContext {
358                session_id: session_id.to_string(),
359                chunk: current.clone(),
360            })
361            .await?;
362            current = transform.chunk.clone();
363            transforms.push(PluginOwned {
364                plugin_id: registered.plugin_id.clone(),
365                value: transform,
366            });
367        }
368        Ok(transforms)
369    }
370
371    pub async fn transform_assistant_response(
372        &self,
373        session_id: &str,
374        response: crate::llm::types::LlmResponse,
375    ) -> Result<Vec<PluginOwned<AssistantResponseTransform>>, PluginError> {
376        let mut current = response;
377        let mut transforms = Vec::new();
378        for registered in &self.contributions.assistant_response_hooks {
379            let transform = (registered.hook)(AssistantResponseHookContext {
380                session_id: session_id.to_string(),
381                response: current.clone(),
382            })
383            .await?;
384            current = transform.response.clone();
385            transforms.push(PluginOwned {
386                plugin_id: registered.plugin_id.clone(),
387                value: transform,
388            });
389        }
390        Ok(transforms)
391    }
392
393    pub async fn finish_assistant_stream(
394        &self,
395        session_id: &str,
396        reason: AssistantStreamFinishReason,
397    ) -> Result<(), PluginError> {
398        for registered in &self.contributions.assistant_stream_finished_hooks {
399            (registered.hook)(AssistantStreamFinishedContext {
400                session_id: session_id.to_string(),
401                reason,
402            })
403            .await?;
404        }
405        Ok(())
406    }
407
408    pub async fn project_tool_result(
409        &self,
410        ctx: ToolResultProjectionContext,
411    ) -> Result<crate::ModelToolReturn, PluginError> {
412        let Some(projector) = &self.contributions.tool_result_projector else {
413            return Ok(crate::ModelToolReturn::from_output(
414                ctx.call_id.clone(),
415                ctx.tool_name.clone(),
416                &ctx.output,
417            ));
418        };
419        (projector.hook)(ctx).await
420    }
421
422    pub async fn emit_runtime_event(&self, event: PluginLifecycleEvent<'_>) {
423        self.emit_runtime_event_with_phase_probe(event, None).await;
424    }
425
426    pub async fn emit_runtime_event_with_phase_probe(
427        &self,
428        event: PluginLifecycleEvent<'_>,
429        phase_probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
430    ) {
431        let hook_kind = lifecycle_event_hook_kind(&event);
432        let mut pending = FuturesUnordered::new();
433        for registered in &self.contributions.runtime_event_hooks {
434            let hook = Arc::clone(&registered.hook);
435            let plugin_id = registered.plugin_id.clone();
436            let phase_name = plugin_hook_phase_name(hook_kind, registered.plugin_id.as_str());
437            let event = event.clone();
438            let phase_probe = phase_probe.clone();
439            pending.push(async move {
440                if let Some(probe) = phase_probe.as_ref() {
441                    probe.begin_named(&phase_name);
442                }
443                let result = hook(event).await;
444                if let Some(probe) = phase_probe.as_ref() {
445                    probe.end_named(&phase_name);
446                }
447                (plugin_id, result)
448            });
449        }
450        while let Some((plugin_id, result)) = pending.next().await {
451            if let Err(err) = result {
452                tracing::warn!(plugin_id, "plugin runtime event hook failed: {err}");
453            }
454        }
455    }
456
457    pub fn has_runtime_event_hooks(&self) -> bool {
458        !self.contributions.runtime_event_hooks.is_empty()
459    }
460
461    pub async fn mutate_session_config(
462        &self,
463        ctx: SessionConfigChangedContext,
464        mut policy: SessionPolicy,
465    ) -> SessionPolicy {
466        for hook in &self.contributions.session_config_mutators {
467            match hook(ctx.clone(), policy.clone()).await {
468                Ok(next_policy) => policy = next_policy,
469                Err(err) => tracing::warn!("plugin config mutator failed: {err}"),
470            }
471        }
472        policy
473    }
474
475    pub fn snapshot(&self) -> Result<PluginSessionSnapshot, PluginError> {
476        let mut plugins = BTreeMap::new();
477        for plugin in &self.plugins {
478            let mut writer = InMemorySnapshotWriter::default();
479            let meta = plugin.snapshot(&mut writer)?;
480            plugins.insert(
481                plugin.id().to_string(),
482                PluginSnapshotEntry {
483                    meta,
484                    artifacts: writer.finish(),
485                },
486            );
487        }
488        Ok(PluginSessionSnapshot { plugins })
489    }
490
491    pub fn snapshot_is_current(&self, previous: Option<&PluginSessionSnapshot>) -> bool {
492        let Some(previous) = previous else {
493            return false;
494        };
495        if previous.plugins.len() != self.plugins.len() {
496            return false;
497        }
498        for plugin in &self.plugins {
499            let Some(entry) = previous.plugins.get(plugin.id()) else {
500                return false;
501            };
502            if entry.meta.plugin_version != plugin.version()
503                || entry.meta.revision != plugin.snapshot_revision()
504            {
505                return false;
506            }
507        }
508        true
509    }
510
511    pub fn snapshot_revision_fingerprint(&self) -> u64 {
512        let mut hasher = Sha256::new();
513        for plugin in &self.plugins {
514            hasher.update(plugin.id().as_bytes());
515            hasher.update([0]);
516            hasher.update(plugin.version().as_bytes());
517            hasher.update([0]);
518            hasher.update(plugin.snapshot_revision().to_le_bytes());
519            hasher.update([0xff]);
520        }
521        let digest = hasher.finalize();
522        u64::from_le_bytes(digest[..8].try_into().expect("digest prefix"))
523    }
524
525    pub fn restore(&self, snapshot: &PluginSessionSnapshot) -> Result<(), PluginError> {
526        for plugin in &self.plugins {
527            if let Some(entry) = snapshot.plugins.get(plugin.id()) {
528                let reader = InMemorySnapshotReader { entry };
529                plugin.restore(&entry.meta, &reader)?;
530            } else {
531                plugin.restore(
532                    &PluginSnapshotMeta {
533                        plugin_id: plugin.id().to_string(),
534                        plugin_version: plugin.version().to_string(),
535                        revision: plugin.snapshot_revision(),
536                        state: None,
537                    },
538                    &EmptySnapshotReader,
539                )?;
540            }
541        }
542        Ok(())
543    }
544
545    pub fn fork_for_session(
546        &self,
547        session_id: impl Into<String>,
548    ) -> Result<Arc<PluginSession>, PluginError> {
549        let snapshot = self.snapshot()?;
550        self.host.build_session_with_overlay(
551            session_id,
552            Some(&snapshot),
553            self.tool_catalog_overlay.clone(),
554            Some(self.tool_registry.export_state()),
555        )
556    }
557
558    pub fn fork_for_child_session(
559        &self,
560        session_id: impl Into<String>,
561        parent_session_id: Option<String>,
562        authority: super::SessionAuthorityContext,
563    ) -> Result<Arc<PluginSession>, PluginError> {
564        let snapshot = self.snapshot()?;
565        self.host.build_session_with_parent_and_overlay(
566            session_id,
567            parent_session_id,
568            Some(&snapshot),
569            self.tool_catalog_overlay.clone(),
570            Some(self.tool_registry.export_state()),
571            authority,
572        )
573    }
574
575    pub fn fork_for_session_with_tool_catalog(
576        &self,
577        session_id: impl Into<String>,
578        tool_catalog_overlay: ToolCatalogContribution,
579    ) -> Result<Arc<PluginSession>, PluginError> {
580        let snapshot = self.snapshot()?;
581        self.host.build_session_with_overlay(
582            session_id,
583            Some(&snapshot),
584            tool_catalog_overlay,
585            Some(self.tool_registry.export_state()),
586        )
587    }
588
589    fn effective_operation_session(
590        &self,
591        name: &str,
592        session_param: SessionParam,
593        session_id: Option<String>,
594        default_to_current_session: bool,
595    ) -> Result<Option<String>, PluginOperationInvokeError> {
596        let effective_session = session_id.or_else(|| {
597            if default_to_current_session && !self.session_id.is_empty() {
598                Some(self.session_id.clone())
599            } else {
600                None
601            }
602        });
603
604        match (session_param, effective_session.as_ref()) {
605            (SessionParam::Required, None) => {
606                return Err(PluginOperationInvokeError::MissingSession(name.to_string()));
607            }
608            (SessionParam::Forbidden, Some(_)) => {
609                return Err(PluginOperationInvokeError::UnexpectedSession(
610                    name.to_string(),
611                ));
612            }
613            _ => {}
614        }
615        Ok(effective_session)
616    }
617
618    pub(crate) async fn query_plugin(
619        &self,
620        name: &str,
621        args: serde_json::Value,
622        session_id: Option<String>,
623        default_to_current_session: bool,
624        sessions: Arc<dyn SessionReadService>,
625        processes: Arc<dyn ProcessReadService>,
626    ) -> Result<(String, serde_json::Value), PluginOperationInvokeError> {
627        let Some(op) = self.contributions.plugin_queries.get(name).cloned() else {
628            return Err(PluginOperationInvokeError::Unknown(name.to_string()));
629        };
630        let effective_session = self.effective_operation_session(
631            name,
632            op.def.session_param,
633            session_id,
634            default_to_current_session,
635        )?;
636        let output = (op.handler)(
637            PluginQueryContext {
638                session_id: effective_session,
639                sessions,
640                processes,
641            },
642            args,
643        )
644        .await
645        .map_err(|err| PluginOperationInvokeError::Failed(err.to_string()))?;
646        Ok((op.plugin_id, output))
647    }
648
649    #[expect(
650        clippy::too_many_arguments,
651        reason = "plugin command invocation carries the runtime mutation services exposed to commands"
652    )]
653    pub async fn run_plugin_command_value(
654        &self,
655        name: &str,
656        args: serde_json::Value,
657        session_id: Option<String>,
658        default_to_current_session: bool,
659        sessions: Arc<dyn SessionStateService>,
660        session_lifecycle: Arc<dyn SessionLifecycleService>,
661        session_graph: Arc<dyn SessionGraphService>,
662        processes: Arc<dyn crate::ProcessService>,
663    ) -> Result<(String, PluginCommandOutcome<serde_json::Value>), PluginOperationInvokeError> {
664        let (plugin_id, outcome) = self
665            .run_plugin_command(
666                name,
667                args,
668                session_id,
669                default_to_current_session,
670                sessions,
671                session_lifecycle,
672                session_graph,
673                processes,
674            )
675            .await?;
676        Ok((
677            plugin_id,
678            PluginCommandOutcome {
679                output: outcome.output,
680                events: outcome.events,
681                directives: outcome.directives,
682            },
683        ))
684    }
685
686    #[expect(
687        clippy::too_many_arguments,
688        reason = "plugin command invocation carries the runtime mutation services exposed to commands"
689    )]
690    pub(crate) async fn run_plugin_command(
691        &self,
692        name: &str,
693        args: serde_json::Value,
694        session_id: Option<String>,
695        default_to_current_session: bool,
696        sessions: Arc<dyn SessionStateService>,
697        session_lifecycle: Arc<dyn SessionLifecycleService>,
698        session_graph: Arc<dyn SessionGraphService>,
699        processes: Arc<dyn crate::ProcessService>,
700    ) -> Result<(String, ErasedPluginCommandOutcome), PluginOperationInvokeError> {
701        let Some(op) = self.contributions.plugin_commands.get(name).cloned() else {
702            return Err(PluginOperationInvokeError::Unknown(name.to_string()));
703        };
704        let effective_session = self.effective_operation_session(
705            name,
706            op.def.session_param,
707            session_id,
708            default_to_current_session,
709        )?;
710        let outcome = (op.handler)(
711            PluginCommandContext {
712                session_id: effective_session,
713                sessions,
714                session_lifecycle,
715                session_graph,
716                processes,
717            },
718            args,
719        )
720        .await
721        .map_err(|err| PluginOperationInvokeError::Failed(err.to_string()))?;
722        Ok((op.plugin_id, outcome))
723    }
724
725    #[expect(
726        clippy::too_many_arguments,
727        reason = "plugin task invocation carries mutation services plus the scoped effect boundary"
728    )]
729    pub async fn run_plugin_task_value(
730        &self,
731        name: &str,
732        args: serde_json::Value,
733        session_id: Option<String>,
734        default_to_current_session: bool,
735        sessions: Arc<dyn SessionStateService>,
736        session_lifecycle: Arc<dyn SessionLifecycleService>,
737        session_graph: Arc<dyn SessionGraphService>,
738        processes: Arc<dyn crate::ProcessService>,
739        scoped_effect_controller: crate::ScopedEffectController<'static>,
740        cancellation_token: tokio_util::sync::CancellationToken,
741    ) -> Result<(String, PluginTaskOutcome<serde_json::Value>), PluginOperationInvokeError> {
742        let (plugin_id, outcome) = self
743            .run_plugin_task(
744                name,
745                args,
746                session_id,
747                default_to_current_session,
748                sessions,
749                session_lifecycle,
750                session_graph,
751                processes,
752                scoped_effect_controller,
753                cancellation_token,
754            )
755            .await?;
756        Ok((
757            plugin_id,
758            PluginTaskOutcome {
759                output: outcome.output,
760                events: outcome.events,
761                directives: outcome.directives,
762            },
763        ))
764    }
765
766    #[expect(
767        clippy::too_many_arguments,
768        reason = "plugin task invocation carries mutation services plus the scoped effect boundary"
769    )]
770    pub(crate) async fn run_plugin_task(
771        &self,
772        name: &str,
773        args: serde_json::Value,
774        session_id: Option<String>,
775        default_to_current_session: bool,
776        sessions: Arc<dyn SessionStateService>,
777        session_lifecycle: Arc<dyn SessionLifecycleService>,
778        session_graph: Arc<dyn SessionGraphService>,
779        processes: Arc<dyn crate::ProcessService>,
780        scoped_effect_controller: crate::ScopedEffectController<'static>,
781        cancellation_token: tokio_util::sync::CancellationToken,
782    ) -> Result<(String, ErasedPluginTaskOutcome), PluginOperationInvokeError> {
783        let Some(op) = self.contributions.plugin_tasks.get(name).cloned() else {
784            return Err(PluginOperationInvokeError::Unknown(name.to_string()));
785        };
786        let effective_session = self.effective_operation_session(
787            name,
788            op.def.session_param,
789            session_id,
790            default_to_current_session,
791        )?;
792        let outcome = (op.handler)(
793            PluginTaskContext {
794                session_id: effective_session,
795                sessions,
796                session_lifecycle,
797                session_graph,
798                processes,
799                scoped_effect_controller,
800                cancellation_token,
801            },
802            args,
803        )
804        .await
805        .map_err(|err| PluginOperationInvokeError::Failed(err.to_string()))?;
806        Ok((op.plugin_id, outcome))
807    }
808}