1use std::sync::Arc;
11
12use super::{
13 AfterToolCallHook, AfterTurnHook, AssistantResponseHook, AssistantStreamFinishedHook,
14 AssistantStreamHook, BeforeToolCallHook, BeforeTurnHook, CheckpointHook, ContextCompactor,
15 PluginCommand, PluginCommandHandler, PluginCommandInvokeFuture, PluginCommandOutcome,
16 PluginError, PluginHost, PluginLifecycleEventHook, PluginOperationDef, PluginOperationFailure,
17 PluginOperationKind, PluginQuery, PluginQueryHandler, PluginQueryInvokeFuture, PluginRegistrar,
18 PluginSnapshotMeta, PluginTask, PluginTaskHandler, PluginTaskInvokeFuture, PluginTaskOutcome,
19 PromptContributor, SessionConfigMutator, SessionToolAccess, SnapshotReader, SnapshotWriter,
20 SubagentSessionContext, ToolCatalogContributor, ToolResultProjector, TurnContextTransform,
21};
22use crate::{PluginOptions, ToolProvider};
23
24#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
25pub struct PluginExtensionContribution {
26 pub extension_id: String,
27 #[serde(default)]
28 pub payload: serde_json::Value,
29}
30
31impl PluginExtensionContribution {
32 pub fn new(
33 extension_id: impl Into<String>,
34 payload: impl serde::Serialize,
35 ) -> Result<Self, serde_json::Error> {
36 Ok(Self {
37 extension_id: extension_id.into(),
38 payload: serde_json::to_value(payload)?,
39 })
40 }
41
42 pub fn from_value(extension_id: impl Into<String>, payload: serde_json::Value) -> Self {
43 Self {
44 extension_id: extension_id.into(),
45 payload,
46 }
47 }
48}
49
50#[derive(Clone, Debug, Default, PartialEq, Eq)]
51pub struct PluginExtensions {
52 contributions: std::collections::BTreeMap<String, Vec<serde_json::Value>>,
53}
54
55impl PluginExtensions {
56 pub fn from_contributions(
57 contributions: impl IntoIterator<Item = PluginExtensionContribution>,
58 ) -> Self {
59 let mut extensions = Self::default();
60 for contribution in contributions {
61 extensions.insert(contribution);
62 }
63 extensions
64 }
65
66 pub fn insert(&mut self, contribution: PluginExtensionContribution) {
67 self.contributions
68 .entry(contribution.extension_id)
69 .or_default()
70 .push(contribution.payload);
71 }
72
73 pub fn payloads(&self, extension_id: &str) -> &[serde_json::Value] {
74 self.contributions
75 .get(extension_id)
76 .map(Vec::as_slice)
77 .unwrap_or(&[])
78 }
79
80 pub fn is_empty(&self) -> bool {
81 self.contributions.is_empty()
82 }
83}
84
85#[derive(Clone, Default)]
86pub struct PluginSpec {
87 pub tool_providers: Vec<Arc<dyn ToolProvider>>,
88 pub triggers: Vec<crate::TriggerEvent>,
89 pub prompt_contributors: Vec<PromptContributor>,
90 pub tool_catalog_contributors: Vec<ToolCatalogContributor>,
91 pub before_turn_hooks: Vec<BeforeTurnHook>,
92 pub before_tool_call_hooks: Vec<BeforeToolCallHook>,
93 pub after_tool_call_hooks: Vec<AfterToolCallHook>,
94 pub after_turn_hooks: Vec<AfterTurnHook>,
95 pub checkpoint_hooks: Vec<CheckpointHook>,
96 pub assistant_stream_hooks: Vec<AssistantStreamHook>,
97 pub assistant_response_hooks: Vec<AssistantResponseHook>,
98 pub assistant_stream_finished_hooks: Vec<AssistantStreamFinishedHook>,
99 pub tool_result_projector: Option<ToolResultProjector>,
100 pub runtime_event_hooks: Vec<PluginLifecycleEventHook>,
101 pub session_config_mutators: Vec<SessionConfigMutator>,
102 pub(crate) plugin_queries: Vec<(PluginOperationDef, PluginQueryHandler)>,
103 pub(crate) plugin_commands: Vec<(PluginOperationDef, PluginCommandHandler)>,
104 pub(crate) plugin_tasks: Vec<(PluginOperationDef, PluginTaskHandler)>,
105 pub turn_context_transforms: Vec<(i32, Arc<dyn TurnContextTransform>)>,
106 pub context_compactors: Vec<(i32, Arc<dyn ContextCompactor>)>,
107}
108
109impl PluginSpec {
110 pub fn new() -> Self {
111 Self::default()
112 }
113
114 pub fn with_tool_provider(mut self, provider: Arc<dyn ToolProvider>) -> Self {
115 self.tool_providers.push(provider);
116 self
117 }
118
119 pub fn with_trigger_event(mut self, event: crate::TriggerEvent) -> Self {
120 self.triggers.push(event);
121 self
122 }
123
124 pub fn with_prompt_contributor(mut self, contributor: PromptContributor) -> Self {
125 self.prompt_contributors.push(contributor);
126 self
127 }
128
129 pub fn with_tool_catalog_contributor(mut self, contributor: ToolCatalogContributor) -> Self {
130 self.tool_catalog_contributors.push(contributor);
131 self
132 }
133
134 pub fn with_before_turn(mut self, hook: BeforeTurnHook) -> Self {
135 self.before_turn_hooks.push(hook);
136 self
137 }
138
139 pub fn with_before_tool_call(mut self, hook: BeforeToolCallHook) -> Self {
140 self.before_tool_call_hooks.push(hook);
141 self
142 }
143
144 pub fn with_after_tool_call(mut self, hook: AfterToolCallHook) -> Self {
145 self.after_tool_call_hooks.push(hook);
146 self
147 }
148
149 pub fn with_after_turn(mut self, hook: AfterTurnHook) -> Self {
150 self.after_turn_hooks.push(hook);
151 self
152 }
153
154 pub fn with_checkpoint(mut self, hook: CheckpointHook) -> Self {
155 self.checkpoint_hooks.push(hook);
156 self
157 }
158
159 pub fn with_assistant_stream(mut self, hook: AssistantStreamHook) -> Self {
160 self.assistant_stream_hooks.push(hook);
161 self
162 }
163
164 pub fn with_assistant_response(mut self, hook: AssistantResponseHook) -> Self {
165 self.assistant_response_hooks.push(hook);
166 self
167 }
168
169 pub fn with_assistant_stream_finished(mut self, hook: AssistantStreamFinishedHook) -> Self {
170 self.assistant_stream_finished_hooks.push(hook);
171 self
172 }
173
174 pub fn with_tool_result_projector(mut self, projector: ToolResultProjector) -> Self {
175 self.tool_result_projector = Some(projector);
176 self
177 }
178
179 pub fn with_runtime_event(mut self, hook: PluginLifecycleEventHook) -> Self {
180 self.runtime_event_hooks.push(hook);
181 self
182 }
183
184 pub fn with_session_config_mutator(mut self, hook: SessionConfigMutator) -> Self {
185 self.session_config_mutators.push(hook);
186 self
187 }
188
189 pub(crate) fn with_plugin_query(
190 mut self,
191 def: PluginOperationDef,
192 handler: PluginQueryHandler,
193 ) -> Self {
194 self.plugin_queries.push((def, handler));
195 self
196 }
197
198 pub fn with_plugin_query_typed<Op, F, Fut>(self, handler: F) -> Self
199 where
200 Op: PluginQuery,
201 F: Fn(super::PluginQueryContext, Op::Args) -> Fut + Send + Sync + 'static,
202 Fut: std::future::Future<Output = Result<Op::Output, PluginOperationFailure>>
203 + Send
204 + 'static,
205 {
206 self.with_plugin_query(
207 super::plugin_operation_def::<Op>(PluginOperationKind::Query),
208 Arc::new(move |ctx, args| {
209 let parsed = serde_json::from_value::<Op::Args>(args);
210 match parsed {
211 Ok(args) => {
212 let fut = handler(ctx, args);
213 Box::pin(async move {
214 let output = fut.await?;
215 serde_json::to_value(output).map_err(|err| {
216 PluginOperationFailure::new(format!(
217 "failed to serialize {} output: {err}",
218 Op::NAME
219 ))
220 })
221 }) as PluginQueryInvokeFuture
222 }
223 Err(err) => Box::pin(async move {
224 Err(PluginOperationFailure::new(format!(
225 "invalid {} args: {err}",
226 Op::NAME
227 )))
228 }) as PluginQueryInvokeFuture,
229 }
230 }),
231 )
232 }
233
234 pub(crate) fn with_plugin_command(
235 mut self,
236 def: PluginOperationDef,
237 handler: PluginCommandHandler,
238 ) -> Self {
239 self.plugin_commands.push((def, handler));
240 self
241 }
242
243 pub fn with_plugin_command_typed<Op, F, Fut>(self, handler: F) -> Self
244 where
245 Op: PluginCommand,
246 F: Fn(super::PluginCommandContext, Op::Args) -> Fut + Send + Sync + 'static,
247 Fut: std::future::Future<
248 Output = Result<PluginCommandOutcome<Op::Output>, PluginOperationFailure>,
249 > + Send
250 + 'static,
251 {
252 self.with_plugin_command(
253 super::plugin_operation_def::<Op>(PluginOperationKind::Command),
254 Arc::new(move |ctx, args| {
255 let parsed = serde_json::from_value::<Op::Args>(args);
256 match parsed {
257 Ok(args) => {
258 let fut = handler(ctx, args);
259 Box::pin(async move {
260 let outcome = fut.await?;
261 let output = serde_json::to_value(outcome.output).map_err(|err| {
262 PluginOperationFailure::new(format!(
263 "failed to serialize {} output: {err}",
264 Op::NAME
265 ))
266 })?;
267 Ok(super::actions::ErasedPluginCommandOutcome {
268 output,
269 events: outcome.events,
270 directives: outcome.directives,
271 })
272 }) as PluginCommandInvokeFuture
273 }
274 Err(err) => Box::pin(async move {
275 Err(PluginOperationFailure::new(format!(
276 "invalid {} args: {err}",
277 Op::NAME
278 )))
279 }) as PluginCommandInvokeFuture,
280 }
281 }),
282 )
283 }
284
285 pub fn with_plugin_command_value<Op, F, Fut>(self, handler: F) -> Self
286 where
287 Op: PluginCommand,
288 F: Fn(super::PluginCommandContext, Op::Args) -> Fut + Send + Sync + 'static,
289 Fut: std::future::Future<Output = Result<Op::Output, PluginOperationFailure>>
290 + Send
291 + 'static,
292 {
293 self.with_plugin_command_typed::<Op, _, _>(move |ctx, args| {
294 let fut = handler(ctx, args);
295 async move { fut.await.map(PluginCommandOutcome::new) }
296 })
297 }
298
299 pub(crate) fn with_plugin_task(
300 mut self,
301 def: PluginOperationDef,
302 handler: PluginTaskHandler,
303 ) -> Self {
304 self.plugin_tasks.push((def, handler));
305 self
306 }
307
308 pub fn with_plugin_task_typed<Op, F, Fut>(self, handler: F) -> Self
309 where
310 Op: PluginTask,
311 F: Fn(super::PluginTaskContext, Op::Args) -> Fut + Send + Sync + 'static,
312 Fut: std::future::Future<
313 Output = Result<PluginTaskOutcome<Op::Output>, PluginOperationFailure>,
314 > + Send
315 + 'static,
316 {
317 self.with_plugin_task(
318 super::plugin_operation_def::<Op>(PluginOperationKind::Task),
319 Arc::new(move |ctx, args| {
320 let parsed = serde_json::from_value::<Op::Args>(args);
321 match parsed {
322 Ok(args) => {
323 let fut = handler(ctx, args);
324 Box::pin(async move {
325 let outcome = fut.await?;
326 let output = serde_json::to_value(outcome.output).map_err(|err| {
327 PluginOperationFailure::new(format!(
328 "failed to serialize {} output: {err}",
329 Op::NAME
330 ))
331 })?;
332 Ok(super::actions::ErasedPluginTaskOutcome {
333 output,
334 events: outcome.events,
335 directives: outcome.directives,
336 })
337 }) as PluginTaskInvokeFuture
338 }
339 Err(err) => Box::pin(async move {
340 Err(PluginOperationFailure::new(format!(
341 "invalid {} args: {err}",
342 Op::NAME
343 )))
344 }) as PluginTaskInvokeFuture,
345 }
346 }),
347 )
348 }
349
350 pub fn with_plugin_task_value<Op, F, Fut>(self, handler: F) -> Self
351 where
352 Op: PluginTask,
353 F: Fn(super::PluginTaskContext, Op::Args) -> Fut + Send + Sync + 'static,
354 Fut: std::future::Future<Output = Result<Op::Output, PluginOperationFailure>>
355 + Send
356 + 'static,
357 {
358 self.with_plugin_task_typed::<Op, _, _>(move |ctx, args| {
359 let fut = handler(ctx, args);
360 async move { fut.await.map(PluginTaskOutcome::new) }
361 })
362 }
363
364 pub fn with_turn_context_transform(
365 mut self,
366 priority: i32,
367 transform: Arc<dyn TurnContextTransform>,
368 ) -> Self {
369 self.turn_context_transforms.push((priority, transform));
370 self
371 }
372
373 pub fn with_context_compactor(
374 mut self,
375 priority: i32,
376 compactor: Arc<dyn ContextCompactor>,
377 ) -> Self {
378 self.context_compactors.push((priority, compactor));
379 self
380 }
381}
382
383#[derive(Clone, Debug)]
384pub struct PluginSessionContext {
385 pub session_id: String,
386 pub tool_access: SessionToolAccess,
387 pub subagent: Option<SubagentSessionContext>,
388 pub plugin_options: PluginOptions,
389 pub extensions: PluginExtensions,
390 pub parent_session_id: Option<String>,
395}
396
397impl PluginSessionContext {
398 pub fn is_root_session(&self) -> bool {
402 self.parent_session_id.is_none()
403 }
404}
405
406#[derive(Clone)]
407pub struct SessionReadyContext {
408 pub session_id: String,
409 pub host: PluginHost,
410}
411
412pub trait SessionPlugin: Send + Sync {
413 fn id(&self) -> &'static str;
414
415 fn version(&self) -> &'static str {
416 "1"
417 }
418
419 fn register(&self, reg: &mut PluginRegistrar) -> Result<(), PluginError>;
420
421 fn snapshot(
422 &self,
423 _writer: &mut dyn SnapshotWriter,
424 ) -> Result<PluginSnapshotMeta, PluginError> {
425 Ok(PluginSnapshotMeta {
426 plugin_id: self.id().to_string(),
427 plugin_version: self.version().to_string(),
428 revision: self.snapshot_revision(),
429 state: None,
430 })
431 }
432
433 fn snapshot_revision(&self) -> u64 {
434 0
435 }
436
437 fn restore(
438 &self,
439 _meta: &PluginSnapshotMeta,
440 _reader: &dyn SnapshotReader,
441 ) -> Result<(), PluginError> {
442 Ok(())
443 }
444
445 fn session_ready(&self, _ctx: SessionReadyContext) -> Result<(), PluginError> {
446 Ok(())
447 }
448}
449
450pub trait PluginFactory: Send + Sync {
499 fn id(&self) -> &'static str;
500
501 fn extension_contributions(&self) -> Vec<PluginExtensionContribution> {
502 Vec::new()
503 }
504
505 fn process_engine_contributions(
517 &self,
518 _ctx: &ProcessEngineContributionContext<'_>,
519 ) -> Result<Vec<Arc<dyn crate::ProcessEngine>>, PluginError> {
520 Ok(Vec::new())
521 }
522
523 fn build(&self, ctx: &PluginSessionContext) -> Result<Arc<dyn SessionPlugin>, PluginError>;
526}
527
528pub struct ProcessEngineContributionContext<'a> {
535 extensions: &'a PluginExtensions,
536 trace_context: &'a crate::TraceContext,
537 process_lifecycle_available: bool,
538}
539
540impl<'a> ProcessEngineContributionContext<'a> {
541 pub fn new(
542 extensions: &'a PluginExtensions,
543 trace_context: &'a crate::TraceContext,
544 process_lifecycle_available: bool,
545 ) -> Self {
546 Self {
547 extensions,
548 trace_context,
549 process_lifecycle_available,
550 }
551 }
552
553 pub fn extensions(&self) -> &PluginExtensions {
554 self.extensions
555 }
556
557 pub fn trace_context(&self) -> &crate::TraceContext {
558 self.trace_context
559 }
560
561 pub fn process_lifecycle_available(&self) -> bool {
562 self.process_lifecycle_available
563 }
564}
565
566pub type PluginSpecBuilder =
567 Arc<dyn Fn(&PluginSessionContext) -> Result<PluginSpec, PluginError> + Send + Sync>;
568
569pub struct PluginSpecFactory {
570 id: &'static str,
571 builder: PluginSpecBuilder,
572}
573
574impl PluginSpecFactory {
575 pub fn new(id: &'static str, builder: PluginSpecBuilder) -> Self {
576 Self { id, builder }
577 }
578}
579
580pub struct StaticPluginFactory {
581 id: &'static str,
582 spec: PluginSpec,
583}
584
585impl StaticPluginFactory {
586 pub fn new(id: &'static str, spec: PluginSpec) -> Self {
587 Self { id, spec }
588 }
589}
590
591struct SpecPlugin {
592 id: &'static str,
593 spec: PluginSpec,
594}
595
596impl PluginFactory for PluginSpecFactory {
597 fn id(&self) -> &'static str {
598 self.id
599 }
600
601 fn build(&self, ctx: &PluginSessionContext) -> Result<Arc<dyn SessionPlugin>, PluginError> {
602 Ok(Arc::new(SpecPlugin {
603 id: self.id,
604 spec: (self.builder)(ctx)?,
605 }))
606 }
607}
608
609impl PluginFactory for StaticPluginFactory {
610 fn id(&self) -> &'static str {
611 self.id
612 }
613
614 fn build(&self, _ctx: &PluginSessionContext) -> Result<Arc<dyn SessionPlugin>, PluginError> {
615 Ok(Arc::new(SpecPlugin {
616 id: self.id,
617 spec: self.spec.clone(),
618 }))
619 }
620}
621
622impl SessionPlugin for SpecPlugin {
623 fn id(&self) -> &'static str {
624 self.id
625 }
626
627 fn register(&self, reg: &mut PluginRegistrar) -> Result<(), PluginError> {
628 for provider in &self.spec.tool_providers {
629 reg.tools().provider(Arc::clone(provider))?;
630 }
631 for event in &self.spec.triggers {
632 reg.triggers().declare(event.clone())?;
633 }
634 for contributor in &self.spec.prompt_contributors {
635 reg.prompt().contribute(Arc::clone(contributor));
636 }
637 for contributor in &self.spec.tool_catalog_contributors {
638 reg.tool_catalog().contribute(Arc::clone(contributor));
639 }
640 for hook in &self.spec.before_turn_hooks {
641 reg.turn().before(Arc::clone(hook));
642 }
643 for hook in &self.spec.before_tool_call_hooks {
644 reg.tool_calls().before(Arc::clone(hook));
645 }
646 for hook in &self.spec.after_tool_call_hooks {
647 reg.tool_calls().after(Arc::clone(hook));
648 }
649 for hook in &self.spec.after_turn_hooks {
650 reg.turn().after(Arc::clone(hook));
651 }
652 for hook in &self.spec.checkpoint_hooks {
653 reg.turn().checkpoint(Arc::clone(hook));
654 }
655 for hook in &self.spec.assistant_stream_hooks {
656 reg.output().stream(Arc::clone(hook));
657 }
658 for hook in &self.spec.assistant_response_hooks {
659 reg.output().response(Arc::clone(hook));
660 }
661 for hook in &self.spec.assistant_stream_finished_hooks {
662 reg.output().stream_finished(Arc::clone(hook));
663 }
664 if let Some(projector) = &self.spec.tool_result_projector {
665 reg.tool_results().projector(Arc::clone(projector))?;
666 }
667 for hook in &self.spec.runtime_event_hooks {
668 reg.session().on_event(Arc::clone(hook));
669 }
670 for hook in &self.spec.session_config_mutators {
671 reg.session().config_mutator(Arc::clone(hook));
672 }
673 for (def, handler) in &self.spec.plugin_queries {
674 reg.operations().query(def.clone(), Arc::clone(handler))?;
675 }
676 for (def, handler) in &self.spec.plugin_commands {
677 reg.operations().command(def.clone(), Arc::clone(handler))?;
678 }
679 for (def, handler) in &self.spec.plugin_tasks {
680 reg.operations().task(def.clone(), Arc::clone(handler))?;
681 }
682 for (priority, transform) in &self.spec.turn_context_transforms {
683 reg.context().prepare_turn(*priority, Arc::clone(transform));
684 }
685 for (priority, compactor) in &self.spec.context_compactors {
686 reg.context().compact(*priority, Arc::clone(compactor));
687 }
688 Ok(())
689 }
690}