1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::Arc;
3
4use futures_util::stream::{FuturesUnordered, StreamExt};
5use sha2::{Digest, Sha256};
6
7use super::*;
8
9mod directives;
10mod tools;
11
12async fn collect_owned_async<C, O, H, F>(
13 hooks: &[RegisteredHook<H>],
14 ctx: C,
15 hook_kind: &'static str,
16 phase_probe: Option<&Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
17 invoke: F,
18) -> Result<Vec<PluginOwned<O>>, PluginError>
19where
20 C: Clone,
21 F: Fn(&H, C) -> PluginFuture<Vec<O>>,
22{
23 let mut out = Vec::new();
24 for registered in hooks {
25 let phase_name = plugin_hook_phase_name(hook_kind, ®istered.plugin_id);
26 if let Some(probe) = phase_probe {
27 probe.begin_named(&phase_name);
28 }
29 let result = invoke(®istered.hook, ctx.clone()).await;
30 if let Some(probe) = phase_probe {
31 probe.end_named(&phase_name);
32 }
33 for value in result? {
34 out.push(PluginOwned {
35 plugin_id: registered.plugin_id.clone(),
36 value,
37 });
38 }
39 }
40 Ok(out)
41}
42
43fn plugin_hook_phase_name(hook_kind: &str, plugin_id: &str) -> String {
44 format!("plugin_hook.{hook_kind}.{plugin_id}")
45}
46
47fn lifecycle_event_hook_kind(event: &PluginLifecycleEvent<'_>) -> &'static str {
48 match event {
49 PluginLifecycleEvent::TurnFinalized(_) => "turn_finalized",
50 PluginLifecycleEvent::TurnPersisted(_) => "turn_persisted",
51 PluginLifecycleEvent::SessionRestored(_) => "session_restored",
52 PluginLifecycleEvent::SessionConfigChanged(_) => "session_config_changed",
53 }
54}
55
56fn collect_owned_sync<C, O, H, F>(
57 hooks: &[RegisteredHook<H>],
58 ctx: C,
59 invoke: F,
60) -> Result<Vec<PluginOwned<O>>, PluginError>
61where
62 C: Clone,
63 F: Fn(&H, C) -> Result<O, PluginError>,
64{
65 let mut out = Vec::new();
66 for registered in hooks {
67 out.push(PluginOwned {
68 plugin_id: registered.plugin_id.clone(),
69 value: invoke(®istered.hook, ctx.clone())?,
70 });
71 }
72 Ok(out)
73}
74
75struct EmptySnapshotReader;
76
77impl SnapshotReader for EmptySnapshotReader {
78 fn read_blob(&self, _name: &str) -> Option<&[u8]> {
79 None
80 }
81}
82
83pub struct PluginSession {
84 pub(super) host: PluginHost,
85 pub(super) session_id: String,
86 pub(super) plugins: Vec<Arc<dyn SessionPlugin>>,
87 pub(super) tools: Arc<dyn ToolProvider>,
88 pub(super) tool_registry: Arc<crate::ToolRegistry>,
89 pub(super) tool_catalog_overlay: ToolCatalogContribution,
90 pub(super) tool_access: SessionToolAccess,
91 pub(super) subagent: Option<SubagentSessionContext>,
92 pub(super) extensions: PluginExtensions,
93 pub(super) triggers: crate::TriggerEventCatalog,
94 pub(super) contributions: PluginContributions,
95}
96impl PluginSession {
97 pub fn session_id(&self) -> &str {
98 &self.session_id
99 }
100
101 pub fn tool_access(&self) -> &SessionToolAccess {
102 &self.tool_access
103 }
104
105 pub fn subagent_context(&self) -> Option<&SubagentSessionContext> {
106 self.subagent.as_ref()
107 }
108
109 pub fn extensions(&self) -> &PluginExtensions {
110 &self.extensions
111 }
112
113 pub fn triggers(&self) -> &crate::TriggerEventCatalog {
114 &self.triggers
115 }
116
117 pub fn host(&self) -> &PluginHost {
118 &self.host
119 }
120
121 pub fn tools(&self) -> Arc<dyn ToolProvider> {
122 Arc::clone(&self.tools)
123 }
124
125 pub fn tool_registry(&self) -> Arc<crate::ToolRegistry> {
126 Arc::clone(&self.tool_registry)
127 }
128
129 pub(crate) fn protocol_session(&self) -> &Arc<dyn ProtocolSessionPlugin> {
130 &self
131 .contributions
132 .protocol_session
133 .as_ref()
134 .expect("plugin session must have a protocol session")
135 .hook
136 }
137
138 pub(crate) fn code_executor(&self) -> Option<Arc<dyn CodeExecutorPlugin>> {
139 self.contributions
140 .code_executor
141 .as_ref()
142 .map(|entry| Arc::clone(&entry.hook))
143 }
144
145 pub(crate) fn assistant_prose_projector(
146 &self,
147 ) -> Option<Arc<dyn AssistantProseProjectorPlugin>> {
148 self.contributions
149 .assistant_prose_projector
150 .as_ref()
151 .map(|entry| Arc::clone(&entry.hook))
152 }
153
154 pub fn protocol_driver(&self) -> Arc<dyn ProtocolDriverPlugin> {
155 self.contributions
156 .protocol_driver
157 .as_ref()
158 .map(|entry| Arc::clone(&entry.hook))
159 .expect("plugin session must have a protocol driver")
160 }
161
162 pub fn plugin_operations(&self) -> Vec<PluginOperationDef> {
163 self.contributions
164 .plugin_queries
165 .values()
166 .map(|op| op.def.clone())
167 .chain(
168 self.contributions
169 .plugin_commands
170 .values()
171 .map(|op| op.def.clone()),
172 )
173 .chain(
174 self.contributions
175 .plugin_tasks
176 .values()
177 .map(|op| op.def.clone()),
178 )
179 .collect()
180 }
181
182 pub fn has_assistant_stream_hooks(&self) -> bool {
183 !self.contributions.assistant_stream_hooks.is_empty()
184 }
185
186 pub async fn prepare_turn_context(
189 &self,
190 ctx: &TurnTransformContext<'_>,
191 input: crate::session_model::context::PreparedContext,
192 phase_probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
193 ) -> Result<crate::session_model::context::PreparedContext, ContextError> {
194 let mut current = input;
195 for (_, registered) in &self.contributions.turn_context_transforms {
196 let phase_name =
197 plugin_hook_phase_name("context_transform", registered.plugin_id.as_str());
198 if let Some(probe) = phase_probe.as_ref() {
199 probe.begin_named(&phase_name);
200 }
201 let result = registered.hook.transform(ctx, current).await;
202 if let Some(probe) = phase_probe.as_ref() {
203 probe.end_named(&phase_name);
204 }
205 current = result?;
206 }
207 Ok(current)
208 }
209
210 pub async fn compact_context(
212 &self,
213 ctx: &CompactionContext<'_>,
214 ) -> Result<Option<ContextCompaction>, ContextError> {
215 for (_, registered) in &self.contributions.context_compactors {
216 if let Some(compaction) = registered.hook.compact(ctx).await?
217 && !compaction.is_empty()
218 {
219 return Ok(Some(compaction));
220 }
221 }
222 Ok(None)
223 }
224
225 pub async fn collect_prompt_contributions(
226 &self,
227 ctx: PromptHookContext,
228 ) -> Result<Vec<PromptContribution>, PluginError> {
229 let mut out = collect_owned_async(
230 &self.contributions.prompt_contributors,
231 ctx,
232 "prompt_contributor",
233 None,
234 |hook, ctx| hook(ctx),
235 )
236 .await?
237 .into_iter()
238 .map(|owned| owned.value)
239 .collect::<Vec<_>>();
240 let mut seen = BTreeSet::new();
241 out.retain(|contribution| {
242 seen.insert((
243 format!("{:?}", contribution.slot),
244 contribution.priority,
245 contribution.content.trim().to_string(),
246 ))
247 });
248 out.sort_by(|a, b| {
249 format!("{:?}", a.slot)
250 .cmp(&format!("{:?}", b.slot))
251 .then(a.priority.cmp(&b.priority))
252 });
253 Ok(out)
254 }
255
256 pub async fn before_turn(
257 &self,
258 ctx: TurnHookContext,
259 ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
260 self.before_turn_with_phase_probe(ctx, None).await
261 }
262
263 async fn before_turn_with_phase_probe(
264 &self,
265 ctx: TurnHookContext,
266 phase_probe: Option<&Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
267 ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
268 collect_owned_async(
269 &self.contributions.before_turn_hooks,
270 ctx,
271 "before_turn",
272 phase_probe,
273 |hook, ctx| hook(ctx),
274 )
275 .await
276 }
277
278 pub async fn before_tool_call(
279 &self,
280 ctx: ToolCallHookContext,
281 ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
282 collect_owned_async(
283 &self.contributions.before_tool_call_hooks,
284 ctx,
285 "before_tool_call",
286 None,
287 |hook, ctx| hook(ctx),
288 )
289 .await
290 }
291
292 pub async fn after_tool_call(
293 &self,
294 ctx: ToolResultHookContext,
295 ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
296 collect_owned_async(
297 &self.contributions.after_tool_call_hooks,
298 ctx,
299 "after_tool_call",
300 None,
301 |hook, ctx| hook(ctx),
302 )
303 .await
304 }
305
306 pub async fn after_turn(
307 &self,
308 ctx: TurnResultHookContext,
309 ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
310 self.after_turn_with_phase_probe(ctx, None).await
311 }
312
313 async fn after_turn_with_phase_probe(
314 &self,
315 ctx: TurnResultHookContext,
316 phase_probe: Option<&Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
317 ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
318 collect_owned_async(
319 &self.contributions.after_turn_hooks,
320 ctx,
321 "after_turn",
322 phase_probe,
323 |hook, ctx| hook(ctx),
324 )
325 .await
326 }
327
328 pub async fn at_checkpoint(
329 &self,
330 ctx: CheckpointHookContext,
331 ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
332 collect_owned_async(
333 &self.contributions.checkpoint_hooks,
334 ctx,
335 "checkpoint",
336 None,
337 |hook, ctx| hook(ctx),
338 )
339 .await
340 }
341
342 pub async fn transform_assistant_stream(
343 &self,
344 session_id: &str,
345 chunk: String,
346 ) -> Result<Vec<PluginOwned<AssistantStreamTransform>>, PluginError> {
347 let mut current = chunk;
348 let mut transforms = Vec::new();
349 for registered in &self.contributions.assistant_stream_hooks {
350 let transform = (registered.hook)(AssistantStreamHookContext {
351 session_id: session_id.to_string(),
352 chunk: current.clone(),
353 })
354 .await?;
355 current = transform.chunk.clone();
356 transforms.push(PluginOwned {
357 plugin_id: registered.plugin_id.clone(),
358 value: transform,
359 });
360 }
361 Ok(transforms)
362 }
363
364 pub async fn transform_assistant_response(
365 &self,
366 session_id: &str,
367 response: crate::llm::types::LlmResponse,
368 ) -> Result<Vec<PluginOwned<AssistantResponseTransform>>, PluginError> {
369 let mut current = response;
370 let mut transforms = Vec::new();
371 for registered in &self.contributions.assistant_response_hooks {
372 let transform = (registered.hook)(AssistantResponseHookContext {
373 session_id: session_id.to_string(),
374 response: current.clone(),
375 })
376 .await?;
377 current = transform.response.clone();
378 transforms.push(PluginOwned {
379 plugin_id: registered.plugin_id.clone(),
380 value: transform,
381 });
382 }
383 Ok(transforms)
384 }
385
386 pub async fn project_tool_result(
387 &self,
388 ctx: ToolResultProjectionContext,
389 ) -> Result<crate::ModelToolReturn, PluginError> {
390 let Some(projector) = &self.contributions.tool_result_projector else {
391 return Ok(crate::ModelToolReturn::from_output(
392 ctx.call_id.clone(),
393 ctx.tool_name.clone(),
394 &ctx.output,
395 ));
396 };
397 (projector.hook)(ctx).await
398 }
399
400 pub async fn emit_runtime_event(&self, event: PluginLifecycleEvent<'_>) {
401 self.emit_runtime_event_with_phase_probe(event, None).await;
402 }
403
404 pub async fn emit_runtime_event_with_phase_probe(
405 &self,
406 event: PluginLifecycleEvent<'_>,
407 phase_probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
408 ) {
409 let hook_kind = lifecycle_event_hook_kind(&event);
410 let mut pending = FuturesUnordered::new();
411 for registered in &self.contributions.runtime_event_hooks {
412 let hook = Arc::clone(®istered.hook);
413 let plugin_id = registered.plugin_id.clone();
414 let phase_name = plugin_hook_phase_name(hook_kind, registered.plugin_id.as_str());
415 let event = event.clone();
416 let phase_probe = phase_probe.clone();
417 pending.push(async move {
418 if let Some(probe) = phase_probe.as_ref() {
419 probe.begin_named(&phase_name);
420 }
421 let result = hook(event).await;
422 if let Some(probe) = phase_probe.as_ref() {
423 probe.end_named(&phase_name);
424 }
425 (plugin_id, result)
426 });
427 }
428 while let Some((plugin_id, result)) = pending.next().await {
429 if let Err(err) = result {
430 tracing::warn!(plugin_id, "plugin runtime event hook failed: {err}");
431 }
432 }
433 }
434
435 pub fn has_runtime_event_hooks(&self) -> bool {
436 !self.contributions.runtime_event_hooks.is_empty()
437 }
438
439 pub async fn mutate_session_config(
440 &self,
441 ctx: SessionConfigChangedContext,
442 mut policy: SessionPolicy,
443 ) -> SessionPolicy {
444 for hook in &self.contributions.session_config_mutators {
445 match hook(ctx.clone(), policy.clone()).await {
446 Ok(next_policy) => policy = next_policy,
447 Err(err) => tracing::warn!("plugin config mutator failed: {err}"),
448 }
449 }
450 policy
451 }
452
453 pub fn snapshot(&self) -> Result<PluginSessionSnapshot, PluginError> {
454 let mut plugins = BTreeMap::new();
455 for plugin in &self.plugins {
456 let mut writer = InMemorySnapshotWriter::default();
457 let meta = plugin.snapshot(&mut writer)?;
458 plugins.insert(
459 plugin.id().to_string(),
460 PluginSnapshotEntry {
461 meta,
462 artifacts: writer.finish(),
463 },
464 );
465 }
466 Ok(PluginSessionSnapshot { plugins })
467 }
468
469 pub fn snapshot_is_current(&self, previous: Option<&PluginSessionSnapshot>) -> bool {
470 let Some(previous) = previous else {
471 return false;
472 };
473 if previous.plugins.len() != self.plugins.len() {
474 return false;
475 }
476 for plugin in &self.plugins {
477 let Some(entry) = previous.plugins.get(plugin.id()) else {
478 return false;
479 };
480 if entry.meta.plugin_version != plugin.version()
481 || entry.meta.revision != plugin.snapshot_revision()
482 {
483 return false;
484 }
485 }
486 true
487 }
488
489 pub fn snapshot_revision_fingerprint(&self) -> u64 {
490 let mut hasher = Sha256::new();
491 for plugin in &self.plugins {
492 hasher.update(plugin.id().as_bytes());
493 hasher.update([0]);
494 hasher.update(plugin.version().as_bytes());
495 hasher.update([0]);
496 hasher.update(plugin.snapshot_revision().to_le_bytes());
497 hasher.update([0xff]);
498 }
499 let digest = hasher.finalize();
500 u64::from_le_bytes(digest[..8].try_into().expect("digest prefix"))
501 }
502
503 pub fn restore(&self, snapshot: &PluginSessionSnapshot) -> Result<(), PluginError> {
504 for plugin in &self.plugins {
505 if let Some(entry) = snapshot.plugins.get(plugin.id()) {
506 let reader = InMemorySnapshotReader { entry };
507 plugin.restore(&entry.meta, &reader)?;
508 } else {
509 plugin.restore(
510 &PluginSnapshotMeta {
511 plugin_id: plugin.id().to_string(),
512 plugin_version: plugin.version().to_string(),
513 revision: plugin.snapshot_revision(),
514 state: None,
515 },
516 &EmptySnapshotReader,
517 )?;
518 }
519 }
520 Ok(())
521 }
522
523 pub fn fork_for_session(
524 &self,
525 session_id: impl Into<String>,
526 ) -> Result<Arc<PluginSession>, PluginError> {
527 let snapshot = self.snapshot()?;
528 self.host.build_session_with_overlay(
529 session_id,
530 Some(&snapshot),
531 self.tool_catalog_overlay.clone(),
532 Some(self.tool_registry.export_state()),
533 )
534 }
535
536 pub fn fork_for_child_session(
537 &self,
538 session_id: impl Into<String>,
539 parent_session_id: Option<String>,
540 authority: super::SessionAuthorityContext,
541 ) -> Result<Arc<PluginSession>, PluginError> {
542 let snapshot = self.snapshot()?;
543 self.host.build_session_with_parent_and_overlay(
544 session_id,
545 parent_session_id,
546 Some(&snapshot),
547 self.tool_catalog_overlay.clone(),
548 Some(self.tool_registry.export_state()),
549 authority,
550 )
551 }
552
553 pub fn fork_for_session_with_tool_catalog(
554 &self,
555 session_id: impl Into<String>,
556 tool_catalog_overlay: ToolCatalogContribution,
557 ) -> Result<Arc<PluginSession>, PluginError> {
558 let snapshot = self.snapshot()?;
559 self.host.build_session_with_overlay(
560 session_id,
561 Some(&snapshot),
562 tool_catalog_overlay,
563 Some(self.tool_registry.export_state()),
564 )
565 }
566
567 fn effective_operation_session(
568 &self,
569 name: &str,
570 session_param: SessionParam,
571 session_id: Option<String>,
572 default_to_current_session: bool,
573 ) -> Result<Option<String>, PluginOperationInvokeError> {
574 let effective_session = session_id.or_else(|| {
575 if default_to_current_session && !self.session_id.is_empty() {
576 Some(self.session_id.clone())
577 } else {
578 None
579 }
580 });
581
582 match (session_param, effective_session.as_ref()) {
583 (SessionParam::Required, None) => {
584 return Err(PluginOperationInvokeError::MissingSession(name.to_string()));
585 }
586 (SessionParam::Forbidden, Some(_)) => {
587 return Err(PluginOperationInvokeError::UnexpectedSession(
588 name.to_string(),
589 ));
590 }
591 _ => {}
592 }
593 Ok(effective_session)
594 }
595
596 pub(crate) async fn query_plugin(
597 &self,
598 name: &str,
599 args: serde_json::Value,
600 session_id: Option<String>,
601 default_to_current_session: bool,
602 sessions: Arc<dyn SessionReadService>,
603 processes: Arc<dyn ProcessReadService>,
604 ) -> Result<(String, serde_json::Value), PluginOperationInvokeError> {
605 let Some(op) = self.contributions.plugin_queries.get(name).cloned() else {
606 return Err(PluginOperationInvokeError::Unknown(name.to_string()));
607 };
608 let effective_session = self.effective_operation_session(
609 name,
610 op.def.session_param,
611 session_id,
612 default_to_current_session,
613 )?;
614 let output = (op.handler)(
615 PluginQueryContext {
616 session_id: effective_session,
617 sessions,
618 processes,
619 },
620 args,
621 )
622 .await
623 .map_err(|err| PluginOperationInvokeError::Failed(err.to_string()))?;
624 Ok((op.plugin_id, output))
625 }
626
627 #[expect(
628 clippy::too_many_arguments,
629 reason = "plugin command invocation carries the runtime mutation services exposed to commands"
630 )]
631 pub async fn run_plugin_command_value(
632 &self,
633 name: &str,
634 args: serde_json::Value,
635 session_id: Option<String>,
636 default_to_current_session: bool,
637 sessions: Arc<dyn SessionStateService>,
638 session_lifecycle: Arc<dyn SessionLifecycleService>,
639 session_graph: Arc<dyn SessionGraphService>,
640 processes: Arc<dyn crate::ProcessService>,
641 ) -> Result<(String, PluginCommandOutcome<serde_json::Value>), PluginOperationInvokeError> {
642 let (plugin_id, outcome) = self
643 .run_plugin_command(
644 name,
645 args,
646 session_id,
647 default_to_current_session,
648 sessions,
649 session_lifecycle,
650 session_graph,
651 processes,
652 )
653 .await?;
654 Ok((
655 plugin_id,
656 PluginCommandOutcome {
657 output: outcome.output,
658 events: outcome.events,
659 directives: outcome.directives,
660 },
661 ))
662 }
663
664 #[expect(
665 clippy::too_many_arguments,
666 reason = "plugin command invocation carries the runtime mutation services exposed to commands"
667 )]
668 pub(crate) async fn run_plugin_command(
669 &self,
670 name: &str,
671 args: serde_json::Value,
672 session_id: Option<String>,
673 default_to_current_session: bool,
674 sessions: Arc<dyn SessionStateService>,
675 session_lifecycle: Arc<dyn SessionLifecycleService>,
676 session_graph: Arc<dyn SessionGraphService>,
677 processes: Arc<dyn crate::ProcessService>,
678 ) -> Result<(String, ErasedPluginCommandOutcome), PluginOperationInvokeError> {
679 let Some(op) = self.contributions.plugin_commands.get(name).cloned() else {
680 return Err(PluginOperationInvokeError::Unknown(name.to_string()));
681 };
682 let effective_session = self.effective_operation_session(
683 name,
684 op.def.session_param,
685 session_id,
686 default_to_current_session,
687 )?;
688 let outcome = (op.handler)(
689 PluginCommandContext {
690 session_id: effective_session,
691 sessions,
692 session_lifecycle,
693 session_graph,
694 processes,
695 },
696 args,
697 )
698 .await
699 .map_err(|err| PluginOperationInvokeError::Failed(err.to_string()))?;
700 Ok((op.plugin_id, outcome))
701 }
702
703 #[expect(
704 clippy::too_many_arguments,
705 reason = "plugin task invocation carries mutation services plus the scoped effect boundary"
706 )]
707 pub async fn run_plugin_task_value(
708 &self,
709 name: &str,
710 args: serde_json::Value,
711 session_id: Option<String>,
712 default_to_current_session: bool,
713 sessions: Arc<dyn SessionStateService>,
714 session_lifecycle: Arc<dyn SessionLifecycleService>,
715 session_graph: Arc<dyn SessionGraphService>,
716 processes: Arc<dyn crate::ProcessService>,
717 scoped_effect_controller: crate::ScopedEffectController<'static>,
718 cancellation_token: tokio_util::sync::CancellationToken,
719 ) -> Result<(String, PluginTaskOutcome<serde_json::Value>), PluginOperationInvokeError> {
720 let (plugin_id, outcome) = self
721 .run_plugin_task(
722 name,
723 args,
724 session_id,
725 default_to_current_session,
726 sessions,
727 session_lifecycle,
728 session_graph,
729 processes,
730 scoped_effect_controller,
731 cancellation_token,
732 )
733 .await?;
734 Ok((
735 plugin_id,
736 PluginTaskOutcome {
737 output: outcome.output,
738 events: outcome.events,
739 directives: outcome.directives,
740 },
741 ))
742 }
743
744 #[expect(
745 clippy::too_many_arguments,
746 reason = "plugin task invocation carries mutation services plus the scoped effect boundary"
747 )]
748 pub(crate) async fn run_plugin_task(
749 &self,
750 name: &str,
751 args: serde_json::Value,
752 session_id: Option<String>,
753 default_to_current_session: bool,
754 sessions: Arc<dyn SessionStateService>,
755 session_lifecycle: Arc<dyn SessionLifecycleService>,
756 session_graph: Arc<dyn SessionGraphService>,
757 processes: Arc<dyn crate::ProcessService>,
758 scoped_effect_controller: crate::ScopedEffectController<'static>,
759 cancellation_token: tokio_util::sync::CancellationToken,
760 ) -> Result<(String, ErasedPluginTaskOutcome), PluginOperationInvokeError> {
761 let Some(op) = self.contributions.plugin_tasks.get(name).cloned() else {
762 return Err(PluginOperationInvokeError::Unknown(name.to_string()));
763 };
764 let effective_session = self.effective_operation_session(
765 name,
766 op.def.session_param,
767 session_id,
768 default_to_current_session,
769 )?;
770 let outcome = (op.handler)(
771 PluginTaskContext {
772 session_id: effective_session,
773 sessions,
774 session_lifecycle,
775 session_graph,
776 processes,
777 scoped_effect_controller,
778 cancellation_token,
779 },
780 args,
781 )
782 .await
783 .map_err(|err| PluginOperationInvokeError::Failed(err.to_string()))?;
784 Ok((op.plugin_id, outcome))
785 }
786}