Skip to main content

lash_core/plugin/
runtime_impl.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::Arc;
3use std::sync::{Mutex as StdMutex, Weak};
4
5use super::*;
6
7#[derive(Clone)]
8pub struct PluginHost {
9    factories: Arc<Vec<Arc<dyn PluginFactory>>>,
10    background_tasks_available: bool,
11    sessions: Arc<StdMutex<BTreeMap<String, Weak<PluginSession>>>>,
12}
13
14struct BuildPluginSessionRequest<'a> {
15    session_id: String,
16    parent_session_id: Option<String>,
17    execution_mode: ExecutionMode,
18    standard_context_approach: Option<crate::StandardContextApproach>,
19    snapshot: Option<&'a PluginSessionSnapshot>,
20    tool_surface_overlay: ToolSurfaceContribution,
21    tool_snapshot: Option<crate::ToolState>,
22    authority: SessionAuthorityContext,
23}
24
25#[derive(Clone, Debug, Default)]
26pub struct SessionAuthorityContext {
27    pub tool_access: SessionToolAccess,
28    pub subagent: Option<SubagentSessionAuthority>,
29}
30
31impl PluginHost {
32    pub fn empty() -> Self {
33        Self::new(Vec::new())
34    }
35
36    pub fn new(factories: Vec<Arc<dyn PluginFactory>>) -> Self {
37        let override_ids: BTreeSet<&'static str> =
38            factories.iter().map(|factory| factory.id()).collect();
39        let mut all_factories = super::builtin_plugin_factories();
40        if !override_ids.is_empty() {
41            all_factories.retain(|factory| !override_ids.contains(factory.id()));
42        }
43        all_factories.extend(factories);
44        Self {
45            factories: Arc::new(all_factories),
46            background_tasks_available: false,
47            sessions: Arc::new(StdMutex::new(BTreeMap::new())),
48        }
49    }
50
51    pub fn with_background_tasks(mut self) -> Self {
52        self.background_tasks_available = true;
53        self
54    }
55
56    pub fn with_background_tasks_available(mut self, available: bool) -> Self {
57        self.background_tasks_available = available;
58        self
59    }
60
61    pub fn isolated_registry(&self) -> Self {
62        Self {
63            factories: Arc::clone(&self.factories),
64            background_tasks_available: self.background_tasks_available,
65            sessions: Arc::new(StdMutex::new(BTreeMap::new())),
66        }
67    }
68
69    pub fn factories(&self) -> &[Arc<dyn PluginFactory>] {
70        self.factories.as_ref().as_slice()
71    }
72
73    pub fn supports_standard_context_approach(
74        &self,
75        standard_context_approach: &crate::StandardContextApproach,
76    ) -> bool {
77        let required = standard_context_approach.kind();
78        self.factories().iter().any(|factory| {
79            factory
80                .supported_standard_context_approaches()
81                .contains(&required)
82        })
83    }
84
85    pub fn build_standard_session(
86        &self,
87        session_id: impl Into<String>,
88        snapshot: Option<&PluginSessionSnapshot>,
89    ) -> Result<Arc<PluginSession>, PluginError> {
90        self.build_session(
91            session_id,
92            ExecutionMode::standard(),
93            Some(crate::StandardContextApproach::default()),
94            snapshot,
95        )
96    }
97
98    pub fn build_session(
99        &self,
100        session_id: impl Into<String>,
101        execution_mode: ExecutionMode,
102        standard_context_approach: Option<crate::StandardContextApproach>,
103        snapshot: Option<&PluginSessionSnapshot>,
104    ) -> Result<Arc<PluginSession>, PluginError> {
105        self.build_session_with_surface(
106            session_id,
107            execution_mode,
108            standard_context_approach,
109            snapshot,
110            ToolSurfaceContribution::default(),
111            None,
112        )
113    }
114
115    /// Variant of [`build_session`] that records the caller as the
116    /// parent of the new session. Plugin factories read
117    /// [`PluginSessionContext::is_root_session`] to gate root-only
118    /// behavior; anything that goes through the plain `build_session`
119    /// is treated as a root session by default.
120    pub fn build_session_with_parent(
121        &self,
122        session_id: impl Into<String>,
123        parent_session_id: Option<String>,
124        execution_mode: ExecutionMode,
125        standard_context_approach: Option<crate::StandardContextApproach>,
126        snapshot: Option<&PluginSessionSnapshot>,
127        authority: SessionAuthorityContext,
128    ) -> Result<Arc<PluginSession>, PluginError> {
129        self.build_session_with_parent_and_surface(
130            session_id,
131            parent_session_id,
132            execution_mode,
133            standard_context_approach,
134            snapshot,
135            ToolSurfaceContribution::default(),
136            None,
137            authority,
138        )
139    }
140
141    #[expect(
142        clippy::too_many_arguments,
143        reason = "public plugin-host boundary keeps parent, snapshot, tool overlay, and authority explicit"
144    )]
145    pub fn build_session_with_parent_and_surface(
146        &self,
147        session_id: impl Into<String>,
148        parent_session_id: Option<String>,
149        execution_mode: ExecutionMode,
150        standard_context_approach: Option<crate::StandardContextApproach>,
151        snapshot: Option<&PluginSessionSnapshot>,
152        tool_surface_overlay: ToolSurfaceContribution,
153        tool_snapshot: Option<crate::ToolState>,
154        authority: SessionAuthorityContext,
155    ) -> Result<Arc<PluginSession>, PluginError> {
156        self.build_session_inner(BuildPluginSessionRequest {
157            session_id: session_id.into(),
158            parent_session_id,
159            execution_mode,
160            standard_context_approach,
161            snapshot,
162            tool_surface_overlay,
163            tool_snapshot,
164            authority,
165        })
166    }
167
168    pub fn build_session_with_surface(
169        &self,
170        session_id: impl Into<String>,
171        execution_mode: ExecutionMode,
172        standard_context_approach: Option<crate::StandardContextApproach>,
173        snapshot: Option<&PluginSessionSnapshot>,
174        tool_surface_overlay: ToolSurfaceContribution,
175        tool_snapshot: Option<crate::ToolState>,
176    ) -> Result<Arc<PluginSession>, PluginError> {
177        self.build_session_inner(BuildPluginSessionRequest {
178            session_id: session_id.into(),
179            parent_session_id: None,
180            execution_mode,
181            standard_context_approach,
182            snapshot,
183            tool_surface_overlay,
184            tool_snapshot,
185            authority: SessionAuthorityContext::default(),
186        })
187    }
188
189    fn build_session_inner(
190        &self,
191        request: BuildPluginSessionRequest<'_>,
192    ) -> Result<Arc<PluginSession>, PluginError> {
193        let BuildPluginSessionRequest {
194            session_id,
195            parent_session_id,
196            execution_mode,
197            standard_context_approach,
198            snapshot,
199            tool_surface_overlay,
200            tool_snapshot,
201            authority,
202        } = request;
203        if execution_mode == ExecutionMode::standard() {
204            let approach = standard_context_approach.as_ref().ok_or_else(|| {
205                PluginError::Registration(
206                    "standard execution requires a standard context approach".to_string(),
207                )
208            })?;
209            if matches!(
210                approach,
211                crate::StandardContextApproach::ObservationalMemory(_)
212            ) && !self.supports_standard_context_approach(approach)
213            {
214                return Err(PluginError::Registration(format!(
215                    "standard context approach `{:?}` requires a supporting plugin factory on this plugin host",
216                    approach.kind()
217                )));
218            }
219        } else if standard_context_approach.is_some() {
220            return Err(PluginError::Registration(
221                "standard context approach only applies to standard execution mode".to_string(),
222            ));
223        }
224        let ctx = PluginSessionContext {
225            session_id,
226            execution_mode: execution_mode.clone(),
227            standard_context_approach: standard_context_approach.clone(),
228            tool_access: authority.tool_access.clone(),
229            subagent: authority.subagent.clone(),
230            background_tasks_available: self.background_tasks_available,
231            parent_session_id,
232        };
233        let session_id = ctx.session_id.clone();
234        let mut plugins = Vec::new();
235        let mut reg = PluginRegistrar::new();
236        for factory in self.factories() {
237            let plugin = factory.build(&ctx)?;
238            reg.registering_plugin_id = Some(plugin.id().to_string());
239            plugin.register(&mut reg)?;
240            reg.registering_plugin_id = None;
241            plugins.push(plugin);
242        }
243        let mode_session = reg
244            .mode_session
245            .take()
246            .ok_or_else(|| {
247                PluginError::Registration(format!(
248                    "missing mode session capability for {:?}",
249                    execution_mode
250                ))
251            })?
252            .hook;
253        let mode_native_tools = reg
254            .mode_native_tools
255            .into_iter()
256            .map(|entry| entry.hook)
257            .collect::<Vec<_>>();
258        let mode_protocol_driver = reg.mode_protocol_driver.take().map(|entry| entry.hook);
259        for provider in &mode_native_tools {
260            for manifest in provider.tool_manifests() {
261                if !reg.tool_names.insert(manifest.name.clone()) {
262                    return Err(PluginError::Registration(format!(
263                        "duplicate mode native tool name `{}`",
264                        manifest.name
265                    )));
266                }
267            }
268        }
269        let base_tools: Arc<dyn ToolProvider> = Arc::new(
270            crate::tool_provider::CompositeToolProvider::from_providers(reg.tool_providers.clone()),
271        );
272        let registry = match tool_snapshot {
273            Some(snapshot) => Arc::new(
274                crate::ToolRegistry::from_tool_provider(base_tools)
275                    .map_err(|err| {
276                        PluginError::Registration(format!("failed to build tool registry: {err}"))
277                    })?
278                    .fork_with_state(snapshot)
279                    .map_err(|err| {
280                        PluginError::Session(format!(
281                            "tool state cannot be applied to this plugin host session: {err}"
282                        ))
283                    })?,
284            ),
285            None => Arc::new(crate::ToolRegistry::from_tool_provider(base_tools).map_err(
286                |err| PluginError::Registration(format!("failed to build tool registry: {err}")),
287            )?),
288        };
289        let tools = Arc::clone(&registry) as Arc<dyn ToolProvider>;
290
291        let session = Arc::new(PluginSession {
292            host: self.clone(),
293            session_id: ctx.session_id,
294            execution_mode: execution_mode.clone(),
295            plugins,
296            tools,
297            tool_registry: registry,
298            tool_surface_overlay,
299            tool_access: authority.tool_access,
300            subagent: authority.subagent,
301            prompt_contributors: reg.prompt_contributors,
302            tool_surface_contributors: reg.tool_surface_contributors,
303            tool_discovery_contributors: reg.tool_discovery_contributors,
304            before_turn_hooks: reg.before_turn_hooks,
305            before_tool_call_hooks: reg.before_tool_call_hooks,
306            after_tool_call_hooks: reg.after_tool_call_hooks,
307            after_turn_hooks: reg.after_turn_hooks,
308            checkpoint_hooks: reg.checkpoint_hooks,
309            assistant_stream_hooks: reg.assistant_stream_hooks,
310            assistant_response_hooks: reg.assistant_response_hooks,
311            tool_result_projector: reg.tool_result_projector,
312            runtime_event_hooks: reg.runtime_event_hooks,
313            session_config_mutators: reg.session_config_mutators,
314            plugin_actions: reg.plugin_actions,
315            monitor_specs: reg.monitor_specs,
316            turn_context_transforms: {
317                let mut list = reg.turn_context_transforms;
318                list.sort_by_key(|entry| std::cmp::Reverse(entry.0));
319                list.into_iter().map(|(_, t)| t).collect()
320            },
321            history_rewriters: {
322                let mut list = reg.history_rewriters;
323                list.sort_by_key(|entry| std::cmp::Reverse(entry.0));
324                list.into_iter().map(|(_, r)| r).collect()
325            },
326            mode_session,
327            mode_native_tools,
328            mode_protocol_driver,
329        });
330        self.register_session(&session_id, &session)?;
331        let ready = SessionReadyContext {
332            session_id: session.session_id.clone(),
333            execution_mode,
334            standard_context_approach,
335            host: self.clone(),
336        };
337        for plugin in &session.plugins {
338            plugin.session_ready(ready.clone())?;
339        }
340        if let Some(snapshot) = snapshot {
341            session.restore(snapshot)?;
342        }
343        Ok(session)
344    }
345
346    pub async fn invoke_plugin_action_sessionless(
347        &self,
348        name: &str,
349        args: serde_json::Value,
350    ) -> Result<ToolResult, PluginError> {
351        let session = self.build_standard_session(
352            format!("__external__-{}", uuid::Uuid::new_v4().simple()),
353            None,
354        )?;
355        session
356            .invoke_plugin_action(name, args, None, false, Arc::new(NoopSessionManager))
357            .await
358            .map_err(|err| PluginError::Invoke(err.to_string()))
359    }
360
361    fn register_session(
362        &self,
363        session_id: &str,
364        session: &Arc<PluginSession>,
365    ) -> Result<(), PluginError> {
366        let mut sessions = self.sessions.lock().map_err(|_| {
367            PluginError::Session("plugin host session registry poisoned".to_string())
368        })?;
369        if let Some(existing) = sessions.get(session_id).and_then(Weak::upgrade) {
370            if !Arc::ptr_eq(&existing, session) {
371                return Err(PluginError::Session(format!(
372                    "session `{session_id}` is already registered on this plugin host"
373                )));
374            }
375            return Ok(());
376        }
377        sessions.insert(session_id.to_string(), Arc::downgrade(session));
378        Ok(())
379    }
380
381    pub fn unregister_session(&self, session_id: &str) -> Result<(), PluginError> {
382        let mut sessions = self.sessions.lock().map_err(|_| {
383            PluginError::Session("plugin host session registry poisoned".to_string())
384        })?;
385        sessions.remove(session_id);
386        Ok(())
387    }
388
389    pub fn session(&self, session_id: &str) -> Result<Arc<PluginSession>, PluginActionInvokeError> {
390        let mut sessions = self
391            .sessions
392            .lock()
393            .map_err(|_| PluginActionInvokeError::SessionRegistryPoisoned)?;
394        let Some(weak) = sessions.get(session_id).cloned() else {
395            return Err(PluginActionInvokeError::UnknownSession(
396                session_id.to_string(),
397            ));
398        };
399        match weak.upgrade() {
400            Some(session) => Ok(session),
401            None => {
402                sessions.remove(session_id);
403                Err(PluginActionInvokeError::UnknownSession(
404                    session_id.to_string(),
405                ))
406            }
407        }
408    }
409
410    pub async fn invoke_plugin_action_for_session(
411        &self,
412        session_id: &str,
413        name: &str,
414        args: serde_json::Value,
415        host: Arc<dyn RuntimeSessionHost>,
416    ) -> Result<ToolResult, PluginActionInvokeError> {
417        let session = self.session(session_id)?;
418        session
419            .invoke_plugin_action(name, args, Some(session_id.to_string()), false, host)
420            .await
421    }
422
423    pub fn monitor_specs_for_session(
424        &self,
425        session_id: &str,
426    ) -> Result<Vec<crate::PluginOwned<crate::MonitorSpec>>, PluginActionInvokeError> {
427        Ok(self.session(session_id)?.monitor_specs().to_vec())
428    }
429}