Skip to main content

lash_core/plugin/
session_obj.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::Arc;
3
4use futures_util::stream::{FuturesUnordered, StreamExt};
5use sha2::{Digest, Sha256};
6
7use super::*;
8
9mod directives;
10mod tools;
11
12async fn collect_owned_async<C, O, H, F>(
13    hooks: &[RegisteredHook<H>],
14    ctx: C,
15    hook_kind: &'static str,
16    phase_probe: Option<&Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
17    invoke: F,
18) -> Result<Vec<PluginOwned<O>>, PluginError>
19where
20    C: Clone,
21    F: Fn(&H, C) -> PluginFuture<Vec<O>>,
22{
23    let mut out = Vec::new();
24    for registered in hooks {
25        let phase_name = plugin_hook_phase_name(hook_kind, &registered.plugin_id);
26        if let Some(probe) = phase_probe {
27            probe.begin_named(&phase_name);
28        }
29        let result = invoke(&registered.hook, ctx.clone()).await;
30        if let Some(probe) = phase_probe {
31            probe.end_named(&phase_name);
32        }
33        for value in result? {
34            out.push(PluginOwned {
35                plugin_id: registered.plugin_id.clone(),
36                value,
37            });
38        }
39    }
40    Ok(out)
41}
42
43fn plugin_hook_phase_name(hook_kind: &str, plugin_id: &str) -> String {
44    format!("plugin_hook.{hook_kind}.{plugin_id}")
45}
46
47fn lifecycle_event_hook_kind(event: &PluginLifecycleEvent<'_>) -> &'static str {
48    match event {
49        PluginLifecycleEvent::TurnFinalized(_) => "turn_finalized",
50        PluginLifecycleEvent::TurnPersisted(_) => "turn_persisted",
51        PluginLifecycleEvent::SessionRestored(_) => "session_restored",
52        PluginLifecycleEvent::SessionConfigChanged(_) => "session_config_changed",
53    }
54}
55
56fn collect_owned_sync<C, O, H, F>(
57    hooks: &[RegisteredHook<H>],
58    ctx: C,
59    invoke: F,
60) -> Result<Vec<PluginOwned<O>>, PluginError>
61where
62    C: Clone,
63    F: Fn(&H, C) -> Result<O, PluginError>,
64{
65    let mut out = Vec::new();
66    for registered in hooks {
67        out.push(PluginOwned {
68            plugin_id: registered.plugin_id.clone(),
69            value: invoke(&registered.hook, ctx.clone())?,
70        });
71    }
72    Ok(out)
73}
74
75struct EmptySnapshotReader;
76
77impl SnapshotReader for EmptySnapshotReader {
78    fn read_blob(&self, _name: &str) -> Option<&[u8]> {
79        None
80    }
81}
82
83pub struct PluginSession {
84    pub(super) host: PluginHost,
85    pub(super) session_id: String,
86    pub(super) plugins: Vec<Arc<dyn SessionPlugin>>,
87    pub(super) tools: Arc<dyn ToolProvider>,
88    pub(super) tool_registry: Arc<crate::ToolRegistry>,
89    pub(super) tool_catalog_overlay: ToolCatalogContribution,
90    pub(super) tool_access: SessionToolAccess,
91    pub(super) subagent: Option<SubagentSessionContext>,
92    pub(super) extensions: PluginExtensions,
93    pub(super) triggers: crate::TriggerEventCatalog,
94    pub(super) contributions: PluginContributions,
95}
96impl PluginSession {
97    pub fn session_id(&self) -> &str {
98        &self.session_id
99    }
100
101    pub fn tool_access(&self) -> &SessionToolAccess {
102        &self.tool_access
103    }
104
105    pub fn subagent_context(&self) -> Option<&SubagentSessionContext> {
106        self.subagent.as_ref()
107    }
108
109    pub fn extensions(&self) -> &PluginExtensions {
110        &self.extensions
111    }
112
113    pub fn triggers(&self) -> &crate::TriggerEventCatalog {
114        &self.triggers
115    }
116
117    pub fn host(&self) -> &PluginHost {
118        &self.host
119    }
120
121    pub fn tools(&self) -> Arc<dyn ToolProvider> {
122        Arc::clone(&self.tools)
123    }
124
125    pub fn tool_registry(&self) -> Arc<crate::ToolRegistry> {
126        Arc::clone(&self.tool_registry)
127    }
128
129    pub(crate) fn protocol_session(&self) -> &Arc<dyn ProtocolSessionPlugin> {
130        &self
131            .contributions
132            .protocol_session
133            .as_ref()
134            .expect("plugin session must have a protocol session")
135            .hook
136    }
137
138    pub(crate) fn code_executor(&self) -> Option<Arc<dyn CodeExecutorPlugin>> {
139        self.contributions
140            .code_executor
141            .as_ref()
142            .map(|entry| Arc::clone(&entry.hook))
143    }
144
145    pub(crate) fn assistant_prose_projector(
146        &self,
147    ) -> Option<Arc<dyn AssistantProseProjectorPlugin>> {
148        self.contributions
149            .assistant_prose_projector
150            .as_ref()
151            .map(|entry| Arc::clone(&entry.hook))
152    }
153
154    pub fn protocol_driver(&self) -> Arc<dyn ProtocolDriverPlugin> {
155        self.contributions
156            .protocol_driver
157            .as_ref()
158            .map(|entry| Arc::clone(&entry.hook))
159            .expect("plugin session must have a protocol driver")
160    }
161
162    pub fn plugin_operations(&self) -> Vec<PluginOperationDef> {
163        self.contributions
164            .plugin_queries
165            .values()
166            .map(|op| op.def.clone())
167            .chain(
168                self.contributions
169                    .plugin_commands
170                    .values()
171                    .map(|op| op.def.clone()),
172            )
173            .chain(
174                self.contributions
175                    .plugin_tasks
176                    .values()
177                    .map(|op| op.def.clone()),
178            )
179            .collect()
180    }
181
182    pub fn has_assistant_stream_hooks(&self) -> bool {
183        !self.contributions.assistant_stream_hooks.is_empty()
184    }
185
186    /// Chain registered turn-context transforms, piping each one's output
187    /// into the next in priority order.
188    pub async fn prepare_turn_context(
189        &self,
190        ctx: &TurnTransformContext<'_>,
191        input: crate::session_model::context::PreparedContext,
192        phase_probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
193    ) -> Result<crate::session_model::context::PreparedContext, ContextError> {
194        let mut current = input;
195        for (_, registered) in &self.contributions.turn_context_transforms {
196            let phase_name =
197                plugin_hook_phase_name("context_transform", registered.plugin_id.as_str());
198            if let Some(probe) = phase_probe.as_ref() {
199                probe.begin_named(&phase_name);
200            }
201            let result = registered.hook.transform(ctx, current).await;
202            if let Some(probe) = phase_probe.as_ref() {
203                probe.end_named(&phase_name);
204            }
205            current = result?;
206        }
207        Ok(current)
208    }
209
210    /// Ask registered compactors for seed nodes for a new compaction frame.
211    pub async fn compact_context(
212        &self,
213        ctx: &CompactionContext<'_>,
214    ) -> Result<Option<ContextCompaction>, ContextError> {
215        for (_, registered) in &self.contributions.context_compactors {
216            if let Some(compaction) = registered.hook.compact(ctx).await?
217                && !compaction.is_empty()
218            {
219                return Ok(Some(compaction));
220            }
221        }
222        Ok(None)
223    }
224
225    pub async fn collect_prompt_contributions(
226        &self,
227        ctx: PromptHookContext,
228    ) -> Result<Vec<PromptContribution>, PluginError> {
229        let mut out = collect_owned_async(
230            &self.contributions.prompt_contributors,
231            ctx,
232            "prompt_contributor",
233            None,
234            |hook, ctx| hook(ctx),
235        )
236        .await?
237        .into_iter()
238        .map(|owned| owned.value)
239        .collect::<Vec<_>>();
240        let mut seen = BTreeSet::new();
241        out.retain(|contribution| {
242            seen.insert((
243                format!("{:?}", contribution.slot),
244                contribution.priority,
245                contribution.content.trim().to_string(),
246            ))
247        });
248        out.sort_by(|a, b| {
249            format!("{:?}", a.slot)
250                .cmp(&format!("{:?}", b.slot))
251                .then(a.priority.cmp(&b.priority))
252        });
253        Ok(out)
254    }
255
256    pub async fn before_turn(
257        &self,
258        ctx: TurnHookContext,
259    ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
260        self.before_turn_with_phase_probe(ctx, None).await
261    }
262
263    async fn before_turn_with_phase_probe(
264        &self,
265        ctx: TurnHookContext,
266        phase_probe: Option<&Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
267    ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
268        collect_owned_async(
269            &self.contributions.before_turn_hooks,
270            ctx,
271            "before_turn",
272            phase_probe,
273            |hook, ctx| hook(ctx),
274        )
275        .await
276    }
277
278    pub async fn before_tool_call(
279        &self,
280        ctx: ToolCallHookContext,
281    ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
282        collect_owned_async(
283            &self.contributions.before_tool_call_hooks,
284            ctx,
285            "before_tool_call",
286            None,
287            |hook, ctx| hook(ctx),
288        )
289        .await
290    }
291
292    pub async fn after_tool_call(
293        &self,
294        ctx: ToolResultHookContext,
295    ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
296        collect_owned_async(
297            &self.contributions.after_tool_call_hooks,
298            ctx,
299            "after_tool_call",
300            None,
301            |hook, ctx| hook(ctx),
302        )
303        .await
304    }
305
306    pub async fn after_turn(
307        &self,
308        ctx: TurnResultHookContext,
309    ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
310        self.after_turn_with_phase_probe(ctx, None).await
311    }
312
313    async fn after_turn_with_phase_probe(
314        &self,
315        ctx: TurnResultHookContext,
316        phase_probe: Option<&Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
317    ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
318        collect_owned_async(
319            &self.contributions.after_turn_hooks,
320            ctx,
321            "after_turn",
322            phase_probe,
323            |hook, ctx| hook(ctx),
324        )
325        .await
326    }
327
328    pub async fn at_checkpoint(
329        &self,
330        ctx: CheckpointHookContext,
331    ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
332        collect_owned_async(
333            &self.contributions.checkpoint_hooks,
334            ctx,
335            "checkpoint",
336            None,
337            |hook, ctx| hook(ctx),
338        )
339        .await
340    }
341
342    pub async fn transform_assistant_stream(
343        &self,
344        session_id: &str,
345        chunk: String,
346    ) -> Result<Vec<PluginOwned<AssistantStreamTransform>>, PluginError> {
347        let mut current = chunk;
348        let mut transforms = Vec::new();
349        for registered in &self.contributions.assistant_stream_hooks {
350            let transform = (registered.hook)(AssistantStreamHookContext {
351                session_id: session_id.to_string(),
352                chunk: current.clone(),
353            })
354            .await?;
355            current = transform.chunk.clone();
356            transforms.push(PluginOwned {
357                plugin_id: registered.plugin_id.clone(),
358                value: transform,
359            });
360        }
361        Ok(transforms)
362    }
363
364    pub async fn transform_assistant_response(
365        &self,
366        session_id: &str,
367        response: crate::llm::types::LlmResponse,
368    ) -> Result<Vec<PluginOwned<AssistantResponseTransform>>, PluginError> {
369        let mut current = response;
370        let mut transforms = Vec::new();
371        for registered in &self.contributions.assistant_response_hooks {
372            let transform = (registered.hook)(AssistantResponseHookContext {
373                session_id: session_id.to_string(),
374                response: current.clone(),
375            })
376            .await?;
377            current = transform.response.clone();
378            transforms.push(PluginOwned {
379                plugin_id: registered.plugin_id.clone(),
380                value: transform,
381            });
382        }
383        Ok(transforms)
384    }
385
386    pub async fn project_tool_result(
387        &self,
388        ctx: ToolResultProjectionContext,
389    ) -> Result<crate::ModelToolReturn, PluginError> {
390        let Some(projector) = &self.contributions.tool_result_projector else {
391            return Ok(crate::ModelToolReturn::from_output(
392                ctx.call_id.clone(),
393                ctx.tool_name.clone(),
394                &ctx.output,
395            ));
396        };
397        (projector.hook)(ctx).await
398    }
399
400    pub async fn emit_runtime_event(&self, event: PluginLifecycleEvent<'_>) {
401        self.emit_runtime_event_with_phase_probe(event, None).await;
402    }
403
404    pub async fn emit_runtime_event_with_phase_probe(
405        &self,
406        event: PluginLifecycleEvent<'_>,
407        phase_probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
408    ) {
409        let hook_kind = lifecycle_event_hook_kind(&event);
410        let mut pending = FuturesUnordered::new();
411        for registered in &self.contributions.runtime_event_hooks {
412            let hook = Arc::clone(&registered.hook);
413            let plugin_id = registered.plugin_id.clone();
414            let phase_name = plugin_hook_phase_name(hook_kind, registered.plugin_id.as_str());
415            let event = event.clone();
416            let phase_probe = phase_probe.clone();
417            pending.push(async move {
418                if let Some(probe) = phase_probe.as_ref() {
419                    probe.begin_named(&phase_name);
420                }
421                let result = hook(event).await;
422                if let Some(probe) = phase_probe.as_ref() {
423                    probe.end_named(&phase_name);
424                }
425                (plugin_id, result)
426            });
427        }
428        while let Some((plugin_id, result)) = pending.next().await {
429            if let Err(err) = result {
430                tracing::warn!(plugin_id, "plugin runtime event hook failed: {err}");
431            }
432        }
433    }
434
435    pub fn has_runtime_event_hooks(&self) -> bool {
436        !self.contributions.runtime_event_hooks.is_empty()
437    }
438
439    pub async fn mutate_session_config(
440        &self,
441        ctx: SessionConfigChangedContext,
442        mut policy: SessionPolicy,
443    ) -> SessionPolicy {
444        for hook in &self.contributions.session_config_mutators {
445            match hook(ctx.clone(), policy.clone()).await {
446                Ok(next_policy) => policy = next_policy,
447                Err(err) => tracing::warn!("plugin config mutator failed: {err}"),
448            }
449        }
450        policy
451    }
452
453    pub fn snapshot(&self) -> Result<PluginSessionSnapshot, PluginError> {
454        let mut plugins = BTreeMap::new();
455        for plugin in &self.plugins {
456            let mut writer = InMemorySnapshotWriter::default();
457            let meta = plugin.snapshot(&mut writer)?;
458            plugins.insert(
459                plugin.id().to_string(),
460                PluginSnapshotEntry {
461                    meta,
462                    artifacts: writer.finish(),
463                },
464            );
465        }
466        Ok(PluginSessionSnapshot { plugins })
467    }
468
469    pub fn snapshot_is_current(&self, previous: Option<&PluginSessionSnapshot>) -> bool {
470        let Some(previous) = previous else {
471            return false;
472        };
473        if previous.plugins.len() != self.plugins.len() {
474            return false;
475        }
476        for plugin in &self.plugins {
477            let Some(entry) = previous.plugins.get(plugin.id()) else {
478                return false;
479            };
480            if entry.meta.plugin_version != plugin.version()
481                || entry.meta.revision != plugin.snapshot_revision()
482            {
483                return false;
484            }
485        }
486        true
487    }
488
489    pub fn snapshot_revision_fingerprint(&self) -> u64 {
490        let mut hasher = Sha256::new();
491        for plugin in &self.plugins {
492            hasher.update(plugin.id().as_bytes());
493            hasher.update([0]);
494            hasher.update(plugin.version().as_bytes());
495            hasher.update([0]);
496            hasher.update(plugin.snapshot_revision().to_le_bytes());
497            hasher.update([0xff]);
498        }
499        let digest = hasher.finalize();
500        u64::from_le_bytes(digest[..8].try_into().expect("digest prefix"))
501    }
502
503    pub fn restore(&self, snapshot: &PluginSessionSnapshot) -> Result<(), PluginError> {
504        for plugin in &self.plugins {
505            if let Some(entry) = snapshot.plugins.get(plugin.id()) {
506                let reader = InMemorySnapshotReader { entry };
507                plugin.restore(&entry.meta, &reader)?;
508            } else {
509                plugin.restore(
510                    &PluginSnapshotMeta {
511                        plugin_id: plugin.id().to_string(),
512                        plugin_version: plugin.version().to_string(),
513                        revision: plugin.snapshot_revision(),
514                        state: None,
515                    },
516                    &EmptySnapshotReader,
517                )?;
518            }
519        }
520        Ok(())
521    }
522
523    pub fn fork_for_session(
524        &self,
525        session_id: impl Into<String>,
526    ) -> Result<Arc<PluginSession>, PluginError> {
527        let snapshot = self.snapshot()?;
528        self.host.build_session_with_overlay(
529            session_id,
530            Some(&snapshot),
531            self.tool_catalog_overlay.clone(),
532            Some(self.tool_registry.export_state()),
533        )
534    }
535
536    pub fn fork_for_child_session(
537        &self,
538        session_id: impl Into<String>,
539        parent_session_id: Option<String>,
540        authority: super::SessionAuthorityContext,
541    ) -> Result<Arc<PluginSession>, PluginError> {
542        let snapshot = self.snapshot()?;
543        self.host.build_session_with_parent_and_overlay(
544            session_id,
545            parent_session_id,
546            Some(&snapshot),
547            self.tool_catalog_overlay.clone(),
548            Some(self.tool_registry.export_state()),
549            authority,
550        )
551    }
552
553    pub fn fork_for_session_with_tool_catalog(
554        &self,
555        session_id: impl Into<String>,
556        tool_catalog_overlay: ToolCatalogContribution,
557    ) -> Result<Arc<PluginSession>, PluginError> {
558        let snapshot = self.snapshot()?;
559        self.host.build_session_with_overlay(
560            session_id,
561            Some(&snapshot),
562            tool_catalog_overlay,
563            Some(self.tool_registry.export_state()),
564        )
565    }
566
567    fn effective_operation_session(
568        &self,
569        name: &str,
570        session_param: SessionParam,
571        session_id: Option<String>,
572        default_to_current_session: bool,
573    ) -> Result<Option<String>, PluginOperationInvokeError> {
574        let effective_session = session_id.or_else(|| {
575            if default_to_current_session && !self.session_id.is_empty() {
576                Some(self.session_id.clone())
577            } else {
578                None
579            }
580        });
581
582        match (session_param, effective_session.as_ref()) {
583            (SessionParam::Required, None) => {
584                return Err(PluginOperationInvokeError::MissingSession(name.to_string()));
585            }
586            (SessionParam::Forbidden, Some(_)) => {
587                return Err(PluginOperationInvokeError::UnexpectedSession(
588                    name.to_string(),
589                ));
590            }
591            _ => {}
592        }
593        Ok(effective_session)
594    }
595
596    pub(crate) async fn query_plugin(
597        &self,
598        name: &str,
599        args: serde_json::Value,
600        session_id: Option<String>,
601        default_to_current_session: bool,
602        sessions: Arc<dyn SessionReadService>,
603        processes: Arc<dyn ProcessReadService>,
604    ) -> Result<(String, serde_json::Value), PluginOperationInvokeError> {
605        let Some(op) = self.contributions.plugin_queries.get(name).cloned() else {
606            return Err(PluginOperationInvokeError::Unknown(name.to_string()));
607        };
608        let effective_session = self.effective_operation_session(
609            name,
610            op.def.session_param,
611            session_id,
612            default_to_current_session,
613        )?;
614        let output = (op.handler)(
615            PluginQueryContext {
616                session_id: effective_session,
617                sessions,
618                processes,
619            },
620            args,
621        )
622        .await
623        .map_err(|err| PluginOperationInvokeError::Failed(err.to_string()))?;
624        Ok((op.plugin_id, output))
625    }
626
627    #[expect(
628        clippy::too_many_arguments,
629        reason = "plugin command invocation carries the runtime mutation services exposed to commands"
630    )]
631    pub async fn run_plugin_command_value(
632        &self,
633        name: &str,
634        args: serde_json::Value,
635        session_id: Option<String>,
636        default_to_current_session: bool,
637        sessions: Arc<dyn SessionStateService>,
638        session_lifecycle: Arc<dyn SessionLifecycleService>,
639        session_graph: Arc<dyn SessionGraphService>,
640        processes: Arc<dyn crate::ProcessService>,
641    ) -> Result<(String, PluginCommandOutcome<serde_json::Value>), PluginOperationInvokeError> {
642        let (plugin_id, outcome) = self
643            .run_plugin_command(
644                name,
645                args,
646                session_id,
647                default_to_current_session,
648                sessions,
649                session_lifecycle,
650                session_graph,
651                processes,
652            )
653            .await?;
654        Ok((
655            plugin_id,
656            PluginCommandOutcome {
657                output: outcome.output,
658                events: outcome.events,
659                directives: outcome.directives,
660            },
661        ))
662    }
663
664    #[expect(
665        clippy::too_many_arguments,
666        reason = "plugin command invocation carries the runtime mutation services exposed to commands"
667    )]
668    pub(crate) async fn run_plugin_command(
669        &self,
670        name: &str,
671        args: serde_json::Value,
672        session_id: Option<String>,
673        default_to_current_session: bool,
674        sessions: Arc<dyn SessionStateService>,
675        session_lifecycle: Arc<dyn SessionLifecycleService>,
676        session_graph: Arc<dyn SessionGraphService>,
677        processes: Arc<dyn crate::ProcessService>,
678    ) -> Result<(String, ErasedPluginCommandOutcome), PluginOperationInvokeError> {
679        let Some(op) = self.contributions.plugin_commands.get(name).cloned() else {
680            return Err(PluginOperationInvokeError::Unknown(name.to_string()));
681        };
682        let effective_session = self.effective_operation_session(
683            name,
684            op.def.session_param,
685            session_id,
686            default_to_current_session,
687        )?;
688        let outcome = (op.handler)(
689            PluginCommandContext {
690                session_id: effective_session,
691                sessions,
692                session_lifecycle,
693                session_graph,
694                processes,
695            },
696            args,
697        )
698        .await
699        .map_err(|err| PluginOperationInvokeError::Failed(err.to_string()))?;
700        Ok((op.plugin_id, outcome))
701    }
702
703    #[expect(
704        clippy::too_many_arguments,
705        reason = "plugin task invocation carries mutation services plus the scoped effect boundary"
706    )]
707    pub async fn run_plugin_task_value(
708        &self,
709        name: &str,
710        args: serde_json::Value,
711        session_id: Option<String>,
712        default_to_current_session: bool,
713        sessions: Arc<dyn SessionStateService>,
714        session_lifecycle: Arc<dyn SessionLifecycleService>,
715        session_graph: Arc<dyn SessionGraphService>,
716        processes: Arc<dyn crate::ProcessService>,
717        scoped_effect_controller: crate::ScopedEffectController<'static>,
718        cancellation_token: tokio_util::sync::CancellationToken,
719    ) -> Result<(String, PluginTaskOutcome<serde_json::Value>), PluginOperationInvokeError> {
720        let (plugin_id, outcome) = self
721            .run_plugin_task(
722                name,
723                args,
724                session_id,
725                default_to_current_session,
726                sessions,
727                session_lifecycle,
728                session_graph,
729                processes,
730                scoped_effect_controller,
731                cancellation_token,
732            )
733            .await?;
734        Ok((
735            plugin_id,
736            PluginTaskOutcome {
737                output: outcome.output,
738                events: outcome.events,
739                directives: outcome.directives,
740            },
741        ))
742    }
743
744    #[expect(
745        clippy::too_many_arguments,
746        reason = "plugin task invocation carries mutation services plus the scoped effect boundary"
747    )]
748    pub(crate) async fn run_plugin_task(
749        &self,
750        name: &str,
751        args: serde_json::Value,
752        session_id: Option<String>,
753        default_to_current_session: bool,
754        sessions: Arc<dyn SessionStateService>,
755        session_lifecycle: Arc<dyn SessionLifecycleService>,
756        session_graph: Arc<dyn SessionGraphService>,
757        processes: Arc<dyn crate::ProcessService>,
758        scoped_effect_controller: crate::ScopedEffectController<'static>,
759        cancellation_token: tokio_util::sync::CancellationToken,
760    ) -> Result<(String, ErasedPluginTaskOutcome), PluginOperationInvokeError> {
761        let Some(op) = self.contributions.plugin_tasks.get(name).cloned() else {
762            return Err(PluginOperationInvokeError::Unknown(name.to_string()));
763        };
764        let effective_session = self.effective_operation_session(
765            name,
766            op.def.session_param,
767            session_id,
768            default_to_current_session,
769        )?;
770        let outcome = (op.handler)(
771            PluginTaskContext {
772                session_id: effective_session,
773                sessions,
774                session_lifecycle,
775                session_graph,
776                processes,
777                scoped_effect_controller,
778                cancellation_token,
779            },
780            args,
781        )
782        .await
783        .map_err(|err| PluginOperationInvokeError::Failed(err.to_string()))?;
784        Ok((op.plugin_id, outcome))
785    }
786}