1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::Arc;
3
4use futures_util::stream::{FuturesUnordered, StreamExt};
5use sha2::{Digest, Sha256};
6
7use super::*;
8
9mod directives;
10mod tools;
11
12async fn collect_owned_async<C, O, H, F>(
13 hooks: &[RegisteredHook<H>],
14 ctx: C,
15 hook_kind: &'static str,
16 phase_probe: Option<&Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
17 invoke: F,
18) -> Result<Vec<PluginOwned<O>>, PluginError>
19where
20 C: Clone,
21 F: Fn(&H, C) -> PluginFuture<Vec<O>>,
22{
23 let mut out = Vec::new();
24 for registered in hooks {
25 let phase_name = plugin_hook_phase_name(hook_kind, ®istered.plugin_id);
26 if let Some(probe) = phase_probe {
27 probe.begin_named(&phase_name);
28 }
29 let result = invoke(®istered.hook, ctx.clone()).await;
30 if let Some(probe) = phase_probe {
31 probe.end_named(&phase_name);
32 }
33 for value in result? {
34 out.push(PluginOwned {
35 plugin_id: registered.plugin_id.clone(),
36 value,
37 });
38 }
39 }
40 Ok(out)
41}
42
43fn plugin_hook_phase_name(hook_kind: &str, plugin_id: &str) -> String {
44 format!("plugin_hook.{hook_kind}.{plugin_id}")
45}
46
47fn lifecycle_event_hook_kind(event: &PluginLifecycleEvent<'_>) -> &'static str {
48 match event {
49 PluginLifecycleEvent::TurnFinalized(_) => "turn_finalized",
50 PluginLifecycleEvent::TurnPersisted(_) => "turn_persisted",
51 PluginLifecycleEvent::SessionRestored(_) => "session_restored",
52 PluginLifecycleEvent::SessionConfigChanged(_) => "session_config_changed",
53 }
54}
55
56fn collect_owned_sync<C, O, H, F>(
57 hooks: &[RegisteredHook<H>],
58 ctx: C,
59 invoke: F,
60) -> Result<Vec<PluginOwned<O>>, PluginError>
61where
62 C: Clone,
63 F: Fn(&H, C) -> Result<O, PluginError>,
64{
65 let mut out = Vec::new();
66 for registered in hooks {
67 out.push(PluginOwned {
68 plugin_id: registered.plugin_id.clone(),
69 value: invoke(®istered.hook, ctx.clone())?,
70 });
71 }
72 Ok(out)
73}
74
75struct EmptySnapshotReader;
76
77impl SnapshotReader for EmptySnapshotReader {
78 fn read_blob(&self, _name: &str) -> Option<&[u8]> {
79 None
80 }
81}
82
83pub struct PluginSession {
84 pub(super) host: PluginHost,
85 pub(super) session_id: String,
86 pub(super) plugins: Vec<Arc<dyn SessionPlugin>>,
87 pub(super) tools: Arc<dyn ToolProvider>,
88 pub(super) tool_registry: Arc<crate::ToolRegistry>,
89 pub(super) tool_catalog_overlay: ToolCatalogContribution,
90 pub(super) tool_access: SessionToolAccess,
91 pub(super) subagent: Option<SubagentSessionContext>,
92 pub(super) extensions: PluginExtensions,
93 pub(super) triggers: crate::TriggerEventCatalog,
94 pub(super) contributions: PluginContributions,
95}
96impl PluginSession {
97 pub fn session_id(&self) -> &str {
98 &self.session_id
99 }
100
101 pub fn tool_access(&self) -> &SessionToolAccess {
102 &self.tool_access
103 }
104
105 pub fn subagent_context(&self) -> Option<&SubagentSessionContext> {
106 self.subagent.as_ref()
107 }
108
109 pub fn extensions(&self) -> &PluginExtensions {
110 &self.extensions
111 }
112
113 pub fn triggers(&self) -> &crate::TriggerEventCatalog {
114 &self.triggers
115 }
116
117 pub fn host(&self) -> &PluginHost {
118 &self.host
119 }
120
121 pub fn tools(&self) -> Arc<dyn ToolProvider> {
122 Arc::clone(&self.tools)
123 }
124
125 pub fn tool_registry(&self) -> Arc<crate::ToolRegistry> {
126 Arc::clone(&self.tool_registry)
127 }
128
129 pub(crate) fn protocol_session(&self) -> &Arc<dyn ProtocolSessionPlugin> {
130 &self
131 .contributions
132 .protocol_session
133 .as_ref()
134 .expect("plugin session must have a protocol session")
135 .hook
136 }
137
138 pub(crate) fn code_executor(&self) -> Option<Arc<dyn CodeExecutorPlugin>> {
139 self.contributions
140 .code_executor
141 .as_ref()
142 .map(|entry| Arc::clone(&entry.hook))
143 }
144
145 pub(crate) fn assistant_prose_projector(
146 &self,
147 ) -> Option<Arc<dyn AssistantProseProjectorPlugin>> {
148 self.contributions
149 .assistant_prose_projector
150 .as_ref()
151 .map(|entry| Arc::clone(&entry.hook))
152 }
153
154 pub fn protocol_driver(&self) -> Arc<dyn ProtocolDriverPlugin> {
155 self.contributions
156 .protocol_driver
157 .as_ref()
158 .map(|entry| Arc::clone(&entry.hook))
159 .expect("plugin session must have a protocol driver")
160 }
161
162 pub fn plugin_operations(&self) -> Vec<PluginOperationDef> {
163 self.contributions
164 .plugin_queries
165 .values()
166 .map(|op| op.def.clone())
167 .chain(
168 self.contributions
169 .plugin_commands
170 .values()
171 .map(|op| op.def.clone()),
172 )
173 .chain(
174 self.contributions
175 .plugin_tasks
176 .values()
177 .map(|op| op.def.clone()),
178 )
179 .collect()
180 }
181
182 pub fn has_assistant_stream_hooks(&self) -> bool {
183 !self.contributions.assistant_stream_hooks.is_empty()
184 }
185
186 pub fn has_assistant_stream_finished_hooks(&self) -> bool {
187 !self
188 .contributions
189 .assistant_stream_finished_hooks
190 .is_empty()
191 }
192
193 pub async fn prepare_turn_context(
196 &self,
197 ctx: &TurnTransformContext<'_>,
198 input: crate::session_model::context::PreparedContext,
199 phase_probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
200 ) -> Result<crate::session_model::context::PreparedContext, ContextError> {
201 let mut current = input;
202 for (_, registered) in &self.contributions.turn_context_transforms {
203 let phase_name =
204 plugin_hook_phase_name("context_transform", registered.plugin_id.as_str());
205 if let Some(probe) = phase_probe.as_ref() {
206 probe.begin_named(&phase_name);
207 }
208 let result = registered.hook.transform(ctx, current).await;
209 if let Some(probe) = phase_probe.as_ref() {
210 probe.end_named(&phase_name);
211 }
212 current = result?;
213 }
214 Ok(current)
215 }
216
217 pub async fn compact_context(
219 &self,
220 ctx: &CompactionContext<'_>,
221 ) -> Result<Option<ContextCompaction>, ContextError> {
222 for (_, registered) in &self.contributions.context_compactors {
223 if let Some(compaction) = registered.hook.compact(ctx).await?
224 && !compaction.is_empty()
225 {
226 return Ok(Some(compaction));
227 }
228 }
229 Ok(None)
230 }
231
232 pub async fn collect_prompt_contributions(
233 &self,
234 ctx: PromptHookContext,
235 ) -> Result<Vec<PromptContribution>, PluginError> {
236 let mut out = collect_owned_async(
237 &self.contributions.prompt_contributors,
238 ctx,
239 "prompt_contributor",
240 None,
241 |hook, ctx| hook(ctx),
242 )
243 .await?
244 .into_iter()
245 .map(|owned| owned.value)
246 .collect::<Vec<_>>();
247 let mut seen = BTreeSet::new();
248 out.retain(|contribution| {
249 seen.insert((
250 format!("{:?}", contribution.slot),
251 contribution.priority,
252 contribution.content.trim().to_string(),
253 ))
254 });
255 out.sort_by(|a, b| {
256 format!("{:?}", a.slot)
257 .cmp(&format!("{:?}", b.slot))
258 .then(a.priority.cmp(&b.priority))
259 });
260 Ok(out)
261 }
262
263 pub async fn before_turn(
264 &self,
265 ctx: TurnHookContext,
266 ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
267 self.before_turn_with_phase_probe(ctx, None).await
268 }
269
270 async fn before_turn_with_phase_probe(
271 &self,
272 ctx: TurnHookContext,
273 phase_probe: Option<&Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
274 ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
275 collect_owned_async(
276 &self.contributions.before_turn_hooks,
277 ctx,
278 "before_turn",
279 phase_probe,
280 |hook, ctx| hook(ctx),
281 )
282 .await
283 }
284
285 pub async fn before_tool_call(
286 &self,
287 ctx: ToolCallHookContext,
288 ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
289 collect_owned_async(
290 &self.contributions.before_tool_call_hooks,
291 ctx,
292 "before_tool_call",
293 None,
294 |hook, ctx| hook(ctx),
295 )
296 .await
297 }
298
299 pub async fn after_tool_call(
300 &self,
301 ctx: ToolResultHookContext,
302 ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
303 collect_owned_async(
304 &self.contributions.after_tool_call_hooks,
305 ctx,
306 "after_tool_call",
307 None,
308 |hook, ctx| hook(ctx),
309 )
310 .await
311 }
312
313 pub async fn after_turn(
314 &self,
315 ctx: TurnResultHookContext,
316 ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
317 self.after_turn_with_phase_probe(ctx, None).await
318 }
319
320 async fn after_turn_with_phase_probe(
321 &self,
322 ctx: TurnResultHookContext,
323 phase_probe: Option<&Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
324 ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
325 collect_owned_async(
326 &self.contributions.after_turn_hooks,
327 ctx,
328 "after_turn",
329 phase_probe,
330 |hook, ctx| hook(ctx),
331 )
332 .await
333 }
334
335 pub async fn at_checkpoint(
336 &self,
337 ctx: CheckpointHookContext,
338 ) -> Result<Vec<PluginOwned<PluginDirective>>, PluginError> {
339 collect_owned_async(
340 &self.contributions.checkpoint_hooks,
341 ctx,
342 "checkpoint",
343 None,
344 |hook, ctx| hook(ctx),
345 )
346 .await
347 }
348
349 pub async fn transform_assistant_stream(
350 &self,
351 session_id: &str,
352 chunk: String,
353 ) -> Result<Vec<PluginOwned<AssistantStreamTransform>>, PluginError> {
354 let mut current = chunk;
355 let mut transforms = Vec::new();
356 for registered in &self.contributions.assistant_stream_hooks {
357 let transform = (registered.hook)(AssistantStreamHookContext {
358 session_id: session_id.to_string(),
359 chunk: current.clone(),
360 })
361 .await?;
362 current = transform.chunk.clone();
363 transforms.push(PluginOwned {
364 plugin_id: registered.plugin_id.clone(),
365 value: transform,
366 });
367 }
368 Ok(transforms)
369 }
370
371 pub async fn transform_assistant_response(
372 &self,
373 session_id: &str,
374 response: crate::llm::types::LlmResponse,
375 ) -> Result<Vec<PluginOwned<AssistantResponseTransform>>, PluginError> {
376 let mut current = response;
377 let mut transforms = Vec::new();
378 for registered in &self.contributions.assistant_response_hooks {
379 let transform = (registered.hook)(AssistantResponseHookContext {
380 session_id: session_id.to_string(),
381 response: current.clone(),
382 })
383 .await?;
384 current = transform.response.clone();
385 transforms.push(PluginOwned {
386 plugin_id: registered.plugin_id.clone(),
387 value: transform,
388 });
389 }
390 Ok(transforms)
391 }
392
393 pub async fn finish_assistant_stream(
394 &self,
395 session_id: &str,
396 reason: AssistantStreamFinishReason,
397 ) -> Result<(), PluginError> {
398 for registered in &self.contributions.assistant_stream_finished_hooks {
399 (registered.hook)(AssistantStreamFinishedContext {
400 session_id: session_id.to_string(),
401 reason,
402 })
403 .await?;
404 }
405 Ok(())
406 }
407
408 pub async fn project_tool_result(
409 &self,
410 ctx: ToolResultProjectionContext,
411 ) -> Result<crate::ModelToolReturn, PluginError> {
412 let Some(projector) = &self.contributions.tool_result_projector else {
413 return Ok(crate::ModelToolReturn::from_output(
414 ctx.call_id.clone(),
415 ctx.tool_name.clone(),
416 &ctx.output,
417 ));
418 };
419 (projector.hook)(ctx).await
420 }
421
422 pub async fn emit_runtime_event(&self, event: PluginLifecycleEvent<'_>) {
423 self.emit_runtime_event_with_phase_probe(event, None).await;
424 }
425
426 pub async fn emit_runtime_event_with_phase_probe(
427 &self,
428 event: PluginLifecycleEvent<'_>,
429 phase_probe: Option<Arc<dyn crate::runtime::RuntimeTurnPhaseProbe>>,
430 ) {
431 let hook_kind = lifecycle_event_hook_kind(&event);
432 let mut pending = FuturesUnordered::new();
433 for registered in &self.contributions.runtime_event_hooks {
434 let hook = Arc::clone(®istered.hook);
435 let plugin_id = registered.plugin_id.clone();
436 let phase_name = plugin_hook_phase_name(hook_kind, registered.plugin_id.as_str());
437 let event = event.clone();
438 let phase_probe = phase_probe.clone();
439 pending.push(async move {
440 if let Some(probe) = phase_probe.as_ref() {
441 probe.begin_named(&phase_name);
442 }
443 let result = hook(event).await;
444 if let Some(probe) = phase_probe.as_ref() {
445 probe.end_named(&phase_name);
446 }
447 (plugin_id, result)
448 });
449 }
450 while let Some((plugin_id, result)) = pending.next().await {
451 if let Err(err) = result {
452 tracing::warn!(plugin_id, "plugin runtime event hook failed: {err}");
453 }
454 }
455 }
456
457 pub fn has_runtime_event_hooks(&self) -> bool {
458 !self.contributions.runtime_event_hooks.is_empty()
459 }
460
461 pub async fn mutate_session_config(
462 &self,
463 ctx: SessionConfigChangedContext,
464 mut policy: SessionPolicy,
465 ) -> SessionPolicy {
466 for hook in &self.contributions.session_config_mutators {
467 match hook(ctx.clone(), policy.clone()).await {
468 Ok(next_policy) => policy = next_policy,
469 Err(err) => tracing::warn!("plugin config mutator failed: {err}"),
470 }
471 }
472 policy
473 }
474
475 pub fn snapshot(&self) -> Result<PluginSessionSnapshot, PluginError> {
476 let mut plugins = BTreeMap::new();
477 for plugin in &self.plugins {
478 let mut writer = InMemorySnapshotWriter::default();
479 let meta = plugin.snapshot(&mut writer)?;
480 plugins.insert(
481 plugin.id().to_string(),
482 PluginSnapshotEntry {
483 meta,
484 artifacts: writer.finish(),
485 },
486 );
487 }
488 Ok(PluginSessionSnapshot { plugins })
489 }
490
491 pub fn snapshot_is_current(&self, previous: Option<&PluginSessionSnapshot>) -> bool {
492 let Some(previous) = previous else {
493 return false;
494 };
495 if previous.plugins.len() != self.plugins.len() {
496 return false;
497 }
498 for plugin in &self.plugins {
499 let Some(entry) = previous.plugins.get(plugin.id()) else {
500 return false;
501 };
502 if entry.meta.plugin_version != plugin.version()
503 || entry.meta.revision != plugin.snapshot_revision()
504 {
505 return false;
506 }
507 }
508 true
509 }
510
511 pub fn snapshot_revision_fingerprint(&self) -> u64 {
512 let mut hasher = Sha256::new();
513 for plugin in &self.plugins {
514 hasher.update(plugin.id().as_bytes());
515 hasher.update([0]);
516 hasher.update(plugin.version().as_bytes());
517 hasher.update([0]);
518 hasher.update(plugin.snapshot_revision().to_le_bytes());
519 hasher.update([0xff]);
520 }
521 let digest = hasher.finalize();
522 u64::from_le_bytes(digest[..8].try_into().expect("digest prefix"))
523 }
524
525 pub fn restore(&self, snapshot: &PluginSessionSnapshot) -> Result<(), PluginError> {
526 for plugin in &self.plugins {
527 if let Some(entry) = snapshot.plugins.get(plugin.id()) {
528 let reader = InMemorySnapshotReader { entry };
529 plugin.restore(&entry.meta, &reader)?;
530 } else {
531 plugin.restore(
532 &PluginSnapshotMeta {
533 plugin_id: plugin.id().to_string(),
534 plugin_version: plugin.version().to_string(),
535 revision: plugin.snapshot_revision(),
536 state: None,
537 },
538 &EmptySnapshotReader,
539 )?;
540 }
541 }
542 Ok(())
543 }
544
545 pub fn fork_for_session(
546 &self,
547 session_id: impl Into<String>,
548 ) -> Result<Arc<PluginSession>, PluginError> {
549 let snapshot = self.snapshot()?;
550 self.host.build_session_with_overlay(
551 session_id,
552 Some(&snapshot),
553 self.tool_catalog_overlay.clone(),
554 Some(self.tool_registry.export_state()),
555 )
556 }
557
558 pub fn fork_for_child_session(
559 &self,
560 session_id: impl Into<String>,
561 parent_session_id: Option<String>,
562 authority: super::SessionAuthorityContext,
563 ) -> Result<Arc<PluginSession>, PluginError> {
564 let snapshot = self.snapshot()?;
565 self.host.build_session_with_parent_and_overlay(
566 session_id,
567 parent_session_id,
568 Some(&snapshot),
569 self.tool_catalog_overlay.clone(),
570 Some(self.tool_registry.export_state()),
571 authority,
572 )
573 }
574
575 pub fn fork_for_session_with_tool_catalog(
576 &self,
577 session_id: impl Into<String>,
578 tool_catalog_overlay: ToolCatalogContribution,
579 ) -> Result<Arc<PluginSession>, PluginError> {
580 let snapshot = self.snapshot()?;
581 self.host.build_session_with_overlay(
582 session_id,
583 Some(&snapshot),
584 tool_catalog_overlay,
585 Some(self.tool_registry.export_state()),
586 )
587 }
588
589 fn effective_operation_session(
590 &self,
591 name: &str,
592 session_param: SessionParam,
593 session_id: Option<String>,
594 default_to_current_session: bool,
595 ) -> Result<Option<String>, PluginOperationInvokeError> {
596 let effective_session = session_id.or_else(|| {
597 if default_to_current_session && !self.session_id.is_empty() {
598 Some(self.session_id.clone())
599 } else {
600 None
601 }
602 });
603
604 match (session_param, effective_session.as_ref()) {
605 (SessionParam::Required, None) => {
606 return Err(PluginOperationInvokeError::MissingSession(name.to_string()));
607 }
608 (SessionParam::Forbidden, Some(_)) => {
609 return Err(PluginOperationInvokeError::UnexpectedSession(
610 name.to_string(),
611 ));
612 }
613 _ => {}
614 }
615 Ok(effective_session)
616 }
617
618 pub(crate) async fn query_plugin(
619 &self,
620 name: &str,
621 args: serde_json::Value,
622 session_id: Option<String>,
623 default_to_current_session: bool,
624 sessions: Arc<dyn SessionReadService>,
625 processes: Arc<dyn ProcessReadService>,
626 ) -> Result<(String, serde_json::Value), PluginOperationInvokeError> {
627 let Some(op) = self.contributions.plugin_queries.get(name).cloned() else {
628 return Err(PluginOperationInvokeError::Unknown(name.to_string()));
629 };
630 let effective_session = self.effective_operation_session(
631 name,
632 op.def.session_param,
633 session_id,
634 default_to_current_session,
635 )?;
636 let output = (op.handler)(
637 PluginQueryContext {
638 session_id: effective_session,
639 sessions,
640 processes,
641 },
642 args,
643 )
644 .await
645 .map_err(|err| PluginOperationInvokeError::Failed(err.to_string()))?;
646 Ok((op.plugin_id, output))
647 }
648
649 #[expect(
650 clippy::too_many_arguments,
651 reason = "plugin command invocation carries the runtime mutation services exposed to commands"
652 )]
653 pub async fn run_plugin_command_value(
654 &self,
655 name: &str,
656 args: serde_json::Value,
657 session_id: Option<String>,
658 default_to_current_session: bool,
659 sessions: Arc<dyn SessionStateService>,
660 session_lifecycle: Arc<dyn SessionLifecycleService>,
661 session_graph: Arc<dyn SessionGraphService>,
662 processes: Arc<dyn crate::ProcessService>,
663 ) -> Result<(String, PluginCommandOutcome<serde_json::Value>), PluginOperationInvokeError> {
664 let (plugin_id, outcome) = self
665 .run_plugin_command(
666 name,
667 args,
668 session_id,
669 default_to_current_session,
670 sessions,
671 session_lifecycle,
672 session_graph,
673 processes,
674 )
675 .await?;
676 Ok((
677 plugin_id,
678 PluginCommandOutcome {
679 output: outcome.output,
680 events: outcome.events,
681 directives: outcome.directives,
682 },
683 ))
684 }
685
686 #[expect(
687 clippy::too_many_arguments,
688 reason = "plugin command invocation carries the runtime mutation services exposed to commands"
689 )]
690 pub(crate) async fn run_plugin_command(
691 &self,
692 name: &str,
693 args: serde_json::Value,
694 session_id: Option<String>,
695 default_to_current_session: bool,
696 sessions: Arc<dyn SessionStateService>,
697 session_lifecycle: Arc<dyn SessionLifecycleService>,
698 session_graph: Arc<dyn SessionGraphService>,
699 processes: Arc<dyn crate::ProcessService>,
700 ) -> Result<(String, ErasedPluginCommandOutcome), PluginOperationInvokeError> {
701 let Some(op) = self.contributions.plugin_commands.get(name).cloned() else {
702 return Err(PluginOperationInvokeError::Unknown(name.to_string()));
703 };
704 let effective_session = self.effective_operation_session(
705 name,
706 op.def.session_param,
707 session_id,
708 default_to_current_session,
709 )?;
710 let outcome = (op.handler)(
711 PluginCommandContext {
712 session_id: effective_session,
713 sessions,
714 session_lifecycle,
715 session_graph,
716 processes,
717 },
718 args,
719 )
720 .await
721 .map_err(|err| PluginOperationInvokeError::Failed(err.to_string()))?;
722 Ok((op.plugin_id, outcome))
723 }
724
725 #[expect(
726 clippy::too_many_arguments,
727 reason = "plugin task invocation carries mutation services plus the scoped effect boundary"
728 )]
729 pub async fn run_plugin_task_value(
730 &self,
731 name: &str,
732 args: serde_json::Value,
733 session_id: Option<String>,
734 default_to_current_session: bool,
735 sessions: Arc<dyn SessionStateService>,
736 session_lifecycle: Arc<dyn SessionLifecycleService>,
737 session_graph: Arc<dyn SessionGraphService>,
738 processes: Arc<dyn crate::ProcessService>,
739 scoped_effect_controller: crate::ScopedEffectController<'static>,
740 cancellation_token: tokio_util::sync::CancellationToken,
741 ) -> Result<(String, PluginTaskOutcome<serde_json::Value>), PluginOperationInvokeError> {
742 let (plugin_id, outcome) = self
743 .run_plugin_task(
744 name,
745 args,
746 session_id,
747 default_to_current_session,
748 sessions,
749 session_lifecycle,
750 session_graph,
751 processes,
752 scoped_effect_controller,
753 cancellation_token,
754 )
755 .await?;
756 Ok((
757 plugin_id,
758 PluginTaskOutcome {
759 output: outcome.output,
760 events: outcome.events,
761 directives: outcome.directives,
762 },
763 ))
764 }
765
766 #[expect(
767 clippy::too_many_arguments,
768 reason = "plugin task invocation carries mutation services plus the scoped effect boundary"
769 )]
770 pub(crate) async fn run_plugin_task(
771 &self,
772 name: &str,
773 args: serde_json::Value,
774 session_id: Option<String>,
775 default_to_current_session: bool,
776 sessions: Arc<dyn SessionStateService>,
777 session_lifecycle: Arc<dyn SessionLifecycleService>,
778 session_graph: Arc<dyn SessionGraphService>,
779 processes: Arc<dyn crate::ProcessService>,
780 scoped_effect_controller: crate::ScopedEffectController<'static>,
781 cancellation_token: tokio_util::sync::CancellationToken,
782 ) -> Result<(String, ErasedPluginTaskOutcome), PluginOperationInvokeError> {
783 let Some(op) = self.contributions.plugin_tasks.get(name).cloned() else {
784 return Err(PluginOperationInvokeError::Unknown(name.to_string()));
785 };
786 let effective_session = self.effective_operation_session(
787 name,
788 op.def.session_param,
789 session_id,
790 default_to_current_session,
791 )?;
792 let outcome = (op.handler)(
793 PluginTaskContext {
794 session_id: effective_session,
795 sessions,
796 session_lifecycle,
797 session_graph,
798 processes,
799 scoped_effect_controller,
800 cancellation_token,
801 },
802 args,
803 )
804 .await
805 .map_err(|err| PluginOperationInvokeError::Failed(err.to_string()))?;
806 Ok((op.plugin_id, outcome))
807 }
808}