Skip to main content

lash/
admin.rs

1pub(crate) use crate::session::SessionConfigPatch;
2use crate::support::*;
3pub use lash_core::{AcceptedInjectedTurnInput, PluginCommand, PluginQuery, PluginTask};
4
5#[derive(Clone)]
6pub struct Completions {
7    pub(crate) core: LashCore,
8}
9
10impl Completions {
11    pub async fn resolve(
12        &self,
13        key: lash_core::AwaitEventKey,
14        resolution: lash_core::Resolution,
15    ) -> Result<lash_core::ResolveOutcome> {
16        self.core
17            .env
18            .core
19            .control
20            .effect_host
21            .resolve_await_event(&key, resolution)
22            .await
23            .map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))
24    }
25}
26
27#[derive(Clone)]
28pub struct CoreTriggerAdmin {
29    pub(crate) core: LashCore,
30}
31
32impl CoreTriggerAdmin {
33    pub async fn emit(
34        &self,
35        request: lash_core::TriggerOccurrenceRequest,
36        scoped_effect_controller: ScopedEffectController<'_>,
37    ) -> Result<lash_core::TriggerEmitReport> {
38        let store = self.core.env.trigger_store.as_ref().ok_or_else(|| {
39            EmbedError::Plugin(lash_core::PluginError::Session(
40                "trigger store is unavailable in this runtime".to_string(),
41            ))
42        })?;
43        let drivers = self.core.work_driver.drivers().await;
44        let router = lash_core::TriggerRouter::new(
45            Arc::clone(store),
46            self.core.env.process_registry.clone(),
47            drivers.process,
48        );
49        router
50            .emit(request, scoped_effect_controller.controller())
51            .await
52            .map_err(Into::into)
53    }
54
55    pub async fn subscriptions(
56        &self,
57        filter: lash_core::TriggerSubscriptionFilter,
58    ) -> Result<Vec<lash_core::TriggerRegistration>> {
59        let store = self.core.env.trigger_store.as_ref().ok_or_else(|| {
60            EmbedError::Plugin(lash_core::PluginError::Session(
61                "trigger store is unavailable in this runtime".to_string(),
62            ))
63        })?;
64        let records = store.list_subscriptions(filter).await?;
65        Ok(records
66            .iter()
67            .map(lash_core::TriggerRegistration::from)
68            .collect())
69    }
70}
71
72#[derive(Clone)]
73pub struct SessionAdmin {
74    pub(crate) runtime: RuntimeHandle,
75}
76
77impl SessionAdmin {
78    pub fn config(&self) -> SessionConfigAdmin {
79        SessionConfigAdmin {
80            control: self.clone(),
81        }
82    }
83
84    pub fn tools(&self) -> ToolAdmin {
85        ToolAdmin {
86            control: self.clone(),
87        }
88    }
89
90    pub fn commands(&self) -> SessionCommandAdmin {
91        SessionCommandAdmin {
92            control: self.clone(),
93        }
94    }
95
96    pub fn triggers(&self) -> SessionTriggerAdmin {
97        SessionTriggerAdmin {
98            control: self.clone(),
99        }
100    }
101
102    pub fn state(&self) -> SessionStateAdmin {
103        SessionStateAdmin {
104            control: self.clone(),
105        }
106    }
107
108    pub fn children(&self) -> ChildSessionAdmin {
109        ChildSessionAdmin {
110            control: self.clone(),
111        }
112    }
113
114    pub fn injection(&self) -> InjectionAdmin {
115        InjectionAdmin {
116            control: self.clone(),
117        }
118    }
119
120    pub fn protocol(&self) -> ProtocolAdmin {
121        ProtocolAdmin {
122            control: self.clone(),
123        }
124    }
125
126    pub fn processes(&self) -> SessionProcessAdmin {
127        SessionProcessAdmin {
128            control: self.clone(),
129        }
130    }
131
132    /// Run `f` against the locked runtime writer, then publish the resulting
133    /// observation. The body is the canonical `lock → call → publish_from`
134    /// stamp shared by nearly every mutating control method; publish happens
135    /// unconditionally once the closure returns.
136    async fn with_writer<F, T>(&self, f: F) -> T
137    where
138        F: AsyncFnOnce(&mut LashRuntime) -> T,
139    {
140        let writer = self.runtime.writer();
141        let mut runtime = writer.lock().await;
142        let value = f(&mut runtime).await;
143        self.runtime.publish_from(&runtime);
144        value
145    }
146
147    async fn update_config(&self, patch: SessionConfigPatch) -> Result<()> {
148        self.with_writer(async |runtime: &mut LashRuntime| {
149            runtime
150                .update_session_config(patch.provider, patch.model, patch.prompt)
151                .await;
152        })
153        .await;
154        Ok(())
155    }
156
157    async fn export_state(&self) -> lash_core::SessionSnapshot {
158        self.runtime.observe().read_view.to_snapshot()
159    }
160
161    async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
162        self.with_writer(async |runtime: &mut LashRuntime| {
163            runtime
164                .append_session_nodes(lash_core::AppendSessionNodesRequest {
165                    nodes: messages
166                        .into_iter()
167                        .map(lash_core::SessionAppendNode::message)
168                        .collect(),
169                    requires_ancestor_node_id: None,
170                })
171                .await
172                .map(|_| ())
173                .map_err(Into::into)
174        })
175        .await
176    }
177
178    async fn append_plugin_body(
179        &self,
180        plugin_type: impl Into<String>,
181        body: serde_json::Value,
182    ) -> Result<()> {
183        self.with_writer(async |runtime: &mut LashRuntime| {
184            runtime
185                .append_session_nodes(lash_core::AppendSessionNodesRequest {
186                    nodes: vec![lash_core::SessionAppendNode::plugin(plugin_type, body)],
187                    requires_ancestor_node_id: None,
188                })
189                .await
190                .map(|_| ())
191                .map_err(Into::into)
192        })
193        .await
194    }
195
196    async fn set_persisted_state(&self, state: RuntimeSessionState) -> Result<()> {
197        self.with_writer(async |runtime: &mut LashRuntime| {
198            runtime.set_persisted_state(state).map_err(Into::into)
199        })
200        .await
201    }
202
203    async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
204        self.with_writer(async |runtime: &mut LashRuntime| {
205            runtime.set_prompt_template(template).await;
206        })
207        .await;
208        Ok(())
209    }
210
211    async fn clear_prompt_template(&self) -> Result<()> {
212        self.with_writer(async |runtime: &mut LashRuntime| {
213            runtime.clear_prompt_template().await;
214        })
215        .await;
216        Ok(())
217    }
218
219    async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
220        self.with_writer(async |runtime: &mut LashRuntime| {
221            runtime.add_prompt_contribution(contribution).await;
222        })
223        .await;
224        Ok(())
225    }
226
227    async fn replace_prompt_slot(
228        &self,
229        slot: PromptSlot,
230        contributions: impl IntoIterator<Item = PromptContribution>,
231    ) -> Result<()> {
232        self.with_writer(async |runtime: &mut LashRuntime| {
233            runtime.replace_prompt_slot(slot, contributions).await;
234        })
235        .await;
236        Ok(())
237    }
238
239    async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
240        self.with_writer(async |runtime: &mut LashRuntime| {
241            runtime.clear_prompt_slot(slot).await;
242        })
243        .await;
244        Ok(())
245    }
246
247    async fn apply_protocol_session_extension(
248        &self,
249        extension: lash_core::ProtocolSessionExtensionHandle,
250    ) -> Result<()> {
251        self.with_writer(async |runtime: &mut LashRuntime| {
252            runtime
253                .apply_protocol_session_extension(extension)
254                .await
255                .map_err(Into::into)
256        })
257        .await
258    }
259
260    async fn branch_to_node(
261        &self,
262        target_leaf: Option<String>,
263    ) -> Result<lash_core::SessionSnapshot> {
264        self.with_writer(async |runtime: &mut LashRuntime| {
265            runtime
266                .branch_to_node(target_leaf)
267                .await
268                .map_err(Into::into)
269        })
270        .await
271    }
272
273    /// Refresh the session graph from any background process that signalled it
274    /// changed. This is the honest name for what the core `await_background_work`
275    /// call does — a session-graph resync, **not** a terminal wait on background
276    /// work (that lives on the process admin's `await_output`). Renamed off the
277    /// old `SessionProcessAdmin::await_all` misnomer per the ADR 0019 grill.
278    pub(crate) async fn refresh_background_graph(&self) -> Result<()> {
279        self.with_writer(async |runtime: &mut LashRuntime| {
280            runtime.await_background_work().await.map_err(Into::into)
281        })
282        .await
283    }
284
285    fn process_registry(&self) -> Result<Arc<dyn lash_core::ProcessRegistry>> {
286        self.runtime
287            .observe()
288            .process_registry
289            .clone()
290            .ok_or_else(|| {
291                EmbedError::Plugin(lash_core::PluginError::Session(
292                    "process registry is unavailable in this runtime".to_string(),
293                ))
294            })
295    }
296
297    fn process_observer(&self) -> Result<lash_core::ProcessWorkObserver> {
298        Ok(lash_core::ProcessWorkObserver::new(
299            self.process_registry()?,
300        ))
301    }
302
303    /// An observer over the session's process registry, or `None` when this
304    /// runtime has no registry. Session-scoped reads use this so a registry-less
305    /// runtime observes an empty process set rather than erroring, matching the
306    /// pre-unification `list_process_handles` behavior.
307    fn process_observer_opt(&self) -> Option<lash_core::ProcessWorkObserver> {
308        self.runtime
309            .observe()
310            .process_registry
311            .clone()
312            .map(lash_core::ProcessWorkObserver::new)
313    }
314
315    /// The session scopes this session may address grants under: its root scope
316    /// plus its current agent frame, deduplicated. Mirrors the scope set the
317    /// runtime observation lists handles across, so the grant-scoped session
318    /// view sees exactly what the session can address.
319    fn process_visible_scopes(&self) -> Vec<lash_core::SessionScope> {
320        let observation = self.runtime.observe();
321        let root = observation.process_scope();
322        let mut scopes = vec![root.clone()];
323        let frame_id = observation.persisted_state.current_agent_frame_id.as_str();
324        if !frame_id.is_empty() {
325            let frame_scope =
326                lash_core::SessionScope::for_agent_frame(observation.session_id(), frame_id);
327            if frame_scope.id() != root.id() {
328                scopes.push(frame_scope);
329            }
330        }
331        scopes
332    }
333
334    async fn signal_process(
335        &self,
336        process_id: &str,
337        signal_name: String,
338        signal_id: String,
339        payload: serde_json::Value,
340        scoped_effect_controller: ScopedEffectController<'_>,
341    ) -> Result<lash_core::ProcessEvent> {
342        let writer = self.runtime.writer();
343        let runtime = writer.lock().await;
344        let session_id = runtime.session_id().to_string();
345        let processes = runtime.process_service()?;
346        let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
347        processes
348            .signal(
349                &session_id,
350                process_id,
351                signal_name,
352                signal_id,
353                payload,
354                scope,
355            )
356            .await
357            .map_err(EmbedError::Plugin)
358    }
359
360    async fn transfer_process_handles(
361        &self,
362        to_session_id: &str,
363        process_ids: Vec<String>,
364        scoped_effect_controller: ScopedEffectController<'_>,
365    ) -> Result<()> {
366        let writer = self.runtime.writer();
367        let runtime = writer.lock().await;
368        let session_id = runtime.session_id().to_string();
369        let processes = runtime.process_service()?;
370        let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
371        processes
372            .transfer(&session_id, to_session_id, process_ids, scope)
373            .await
374            .map_err(EmbedError::Plugin)
375    }
376
377    async fn await_process_output(
378        &self,
379        process_id: &str,
380    ) -> Result<lash_core::ProcessAwaitOutput> {
381        lash_core::ProcessAwaiter::polling(self.process_registry()?)
382            .await_terminal(process_id)
383            .await
384            .map_err(Into::into)
385    }
386
387    async fn request_process_abandon(
388        &self,
389        process_id: &str,
390        reason: Option<String>,
391    ) -> Result<lash_core::ObservedProcess> {
392        let session_id = self.runtime.observe().session_id().to_string();
393        let request = lash_core::AbandonRequest {
394            requested_by: format!("session:{session_id}"),
395            requested_at_ms: crate::process_admin::now_epoch_ms(),
396            reason,
397        };
398        self.process_registry()?
399            .request_process_abandon(process_id, request)
400            .await?;
401        self.process_observer()?
402            .process(process_id)
403            .await
404            .ok_or_else(|| {
405                EmbedError::Plugin(lash_core::PluginError::Session(format!(
406                    "process `{process_id}` vanished after recording its abandon request"
407                )))
408            })
409    }
410
411    async fn refresh_tool_catalog(&self) -> Result<()> {
412        self.with_writer(async |runtime: &mut LashRuntime| {
413            runtime
414                .refresh_session_tool_catalog()
415                .await
416                .map_err(Into::into)
417        })
418        .await
419    }
420
421    async fn submit_session_command(
422        &self,
423        command: lash_core::SessionCommand,
424        idempotency_key: impl Into<String>,
425    ) -> Result<lash_core::SessionCommandReceipt> {
426        let idempotency_key = idempotency_key.into();
427        self.with_writer(async |runtime: &mut LashRuntime| {
428            runtime
429                .submit_session_command(command, idempotency_key)
430                .await
431                .map_err(Into::into)
432        })
433        .await
434    }
435
436    async fn list_trigger_registrations(&self) -> Result<Vec<lash_core::TriggerRegistration>> {
437        self.with_writer(async |runtime: &mut LashRuntime| {
438            runtime
439                .list_trigger_registrations()
440                .await
441                .map_err(Into::into)
442        })
443        .await
444    }
445
446    async fn trigger_registrations_by_source_type(
447        &self,
448        source_type: impl Into<lash_core::TriggerEventType>,
449    ) -> Result<Vec<lash_core::TriggerRegistration>> {
450        self.with_writer(async |runtime: &mut LashRuntime| {
451            runtime
452                .trigger_registrations_by_source_type(source_type)
453                .await
454                .map_err(Into::into)
455        })
456        .await
457    }
458
459    async fn query_plugin_raw(
460        &self,
461        name: &str,
462        args: serde_json::Value,
463    ) -> Result<(String, serde_json::Value)> {
464        let observation = self.runtime.observe();
465        let session_id = observation.session_id().to_string();
466        observation
467            .query_plugin(name, args, Some(session_id))
468            .await
469            .map_err(Into::into)
470    }
471
472    async fn run_plugin_command_raw(
473        &self,
474        name: &str,
475        args: serde_json::Value,
476    ) -> Result<lash_core::PluginCommandReceipt<serde_json::Value>> {
477        let session_id = self.runtime.observe().session_id().to_string();
478        let writer = self.runtime.writer();
479        let mut runtime = writer.lock().await;
480        let receipt = runtime
481            .run_plugin_command(name, args, Some(session_id))
482            .await?;
483        self.record_plugin_operation_observations(&receipt.events, &receipt.pending_turn_inputs);
484        self.runtime.publish_from(&runtime);
485        Ok(receipt)
486    }
487
488    async fn run_plugin_task_raw_with_cancel(
489        &self,
490        name: &str,
491        args: serde_json::Value,
492        cancellation_token: CancellationToken,
493    ) -> Result<lash_core::PluginTaskReceipt<serde_json::Value>> {
494        let session_id = self.runtime.observe().session_id().to_string();
495        let writer = self.runtime.writer();
496        let mut runtime = writer.lock().await;
497        let scope_id = format!(
498            "{session_id}:plugin_task:{name}:{}",
499            lash_core::TurnActivityId::fresh().0
500        );
501        let scoped_effect_controller = runtime
502            .effect_host()
503            .scoped_static(lash_core::ExecutionScope::runtime_operation(scope_id))
504            .map_err(EmbedError::Runtime)?
505            .ok_or_else(|| {
506                EmbedError::Plugin(lash_core::PluginError::Session(
507                    "plugin task execution requires an effect host that can create a static runtime-operation scope".to_string(),
508                ))
509            })?;
510        let receipt = runtime
511            .run_plugin_task(
512                name,
513                args,
514                Some(session_id),
515                scoped_effect_controller,
516                cancellation_token,
517            )
518            .await?;
519        self.record_plugin_operation_observations(&receipt.events, &receipt.pending_turn_inputs);
520        self.runtime.publish_from(&runtime);
521        Ok(receipt)
522    }
523
524    fn record_plugin_operation_observations(
525        &self,
526        events: &[lash_core::PluginOwned<lash_core::PluginRuntimeEvent>],
527        pending_turn_inputs: &[lash_core::PendingTurnInput],
528    ) {
529        for owned in events {
530            self.runtime
531                .record_turn_activity(lash_core::TurnActivity::independent(
532                    lash_core::TurnEvent::PluginRuntime {
533                        plugin_id: owned.plugin_id.clone(),
534                        event: owned.value.clone(),
535                    },
536                ));
537        }
538        if !pending_turn_inputs.is_empty() {
539            self.runtime.record_queue_changed(
540                lash_core::SessionQueueEventKind::Enqueued,
541                pending_turn_inputs
542                    .iter()
543                    .map(|input| input.input_id.clone())
544                    .collect(),
545            );
546        }
547    }
548
549    async fn compact_context(
550        &self,
551        instructions: Option<String>,
552        scoped_effect_controller: ScopedEffectController<'_>,
553    ) -> Result<bool> {
554        self.with_writer(async |runtime: &mut LashRuntime| {
555            runtime
556                .compact_context(instructions, scoped_effect_controller)
557                .await
558                .map_err(Into::into)
559        })
560        .await
561    }
562
563    async fn persist_current_state(&self) -> Result<RuntimeSessionState> {
564        self.with_writer(async |runtime: &mut LashRuntime| {
565            runtime.await_background_work().await?;
566            Ok(runtime.export_persisted_state())
567        })
568        .await
569    }
570
571    async fn start_process(
572        &self,
573        request: lash_core::ProcessStartRequest,
574        scoped_effect_controller: ScopedEffectController<'_>,
575    ) -> Result<lash_core::ProcessHandleSummary> {
576        let writer = self.runtime.writer();
577        let runtime = writer.lock().await;
578        let session_id = runtime.session_id().to_string();
579        let processes = runtime.process_service()?;
580        let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
581        let summary = processes
582            .start_from_request(&session_id, request, scope)
583            .await
584            .map_err(EmbedError::Plugin)?;
585        self.runtime.record_process_changed(
586            SessionProcessEventKind::Started,
587            vec![summary.process_id.clone()],
588        );
589        Ok(summary)
590    }
591
592    async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
593        self.runtime
594            .writer()
595            .lock()
596            .await
597            .session_state_service()
598            .map_err(Into::into)
599    }
600
601    async fn cancel_process(
602        &self,
603        process_id: &str,
604        scoped_effect_controller: ScopedEffectController<'_>,
605    ) -> Result<lash_core::ProcessCancelSummary> {
606        let writer = self.runtime.writer();
607        let runtime = writer.lock().await;
608        let session_id = runtime.session_id().to_string();
609        let processes = runtime.process_service()?;
610        let cancel_ability = runtime.process_cancel_ability();
611        let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
612        let summary = cancel_ability
613            .cancel_summary(
614                processes.as_ref(),
615                lash_core::ProcessCancelRequest::new(
616                    &session_id,
617                    process_id,
618                    scope,
619                    lash_core::ProcessCancelSource::HostApi,
620                )
621                .with_reason("requested by host API"),
622            )
623            .await
624            .map_err(EmbedError::Plugin)?;
625        self.runtime.record_process_changed(
626            SessionProcessEventKind::Cancelled,
627            vec![summary.process_id.clone()],
628        );
629        Ok(summary)
630    }
631
632    async fn cancel_visible_processes(
633        &self,
634        scoped_effect_controller: ScopedEffectController<'_>,
635    ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
636        let writer = self.runtime.writer();
637        let runtime = writer.lock().await;
638        let session_id = runtime.session_id().to_string();
639        let processes = runtime.process_service()?;
640        let cancel_ability = runtime.process_cancel_ability();
641        let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
642        let summaries = cancel_ability
643            .cancel_all_visible(
644                processes.as_ref(),
645                lash_core::ProcessCancelAllRequest::new(
646                    &session_id,
647                    scope,
648                    lash_core::ProcessCancelSource::HostApi,
649                )
650                .with_reason("requested by host API"),
651            )
652            .await
653            .map_err(EmbedError::Plugin)?;
654        self.runtime.record_process_changed(
655            SessionProcessEventKind::Cancelled,
656            summaries
657                .iter()
658                .map(|summary| summary.process_id.clone())
659                .collect(),
660        );
661        Ok(summaries)
662    }
663
664    async fn snapshot_execution_state(&self) -> Result<Option<Vec<u8>>> {
665        self.with_writer(async |runtime: &mut LashRuntime| {
666            runtime.snapshot_execution_state().await.map_err(Into::into)
667        })
668        .await
669    }
670
671    async fn restore_execution_state(&self, bytes: &[u8]) -> Result<()> {
672        self.with_writer(async |runtime: &mut LashRuntime| {
673            runtime
674                .restore_execution_state(bytes)
675                .await
676                .map_err(Into::into)
677        })
678        .await
679    }
680
681    async fn tool_state(&self) -> Result<ToolState> {
682        self.runtime.observe().tool_state.clone().ok_or_else(|| {
683            EmbedError::Session(SessionError::Protocol(
684                "runtime session not available".to_string(),
685            ))
686        })
687    }
688
689    async fn apply_tool_state(&self, state: ToolState) -> Result<u64> {
690        self.with_writer(async |runtime: &mut LashRuntime| {
691            runtime
692                .apply_tool_state(state)
693                .await
694                .map_err(EmbedError::from)
695        })
696        .await
697    }
698
699    async fn restore_tool_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
700        self.with_writer(async |runtime: &mut LashRuntime| {
701            runtime
702                .restore_tool_state(state)
703                .await
704                .map_err(EmbedError::from)
705        })
706        .await
707    }
708
709    async fn set_tool_membership(&self, tool_id: lash_core::ToolId, present: bool) -> Result<u64> {
710        self.set_tool_membership_many(&[(tool_id, present)]).await
711    }
712
713    async fn set_tool_membership_many(&self, updates: &[(lash_core::ToolId, bool)]) -> Result<u64> {
714        let mut state = self.tool_state().await?;
715        for (tool_id, present) in updates {
716            state
717                .set_membership(tool_id, *present)
718                .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
719        }
720        self.apply_tool_state(state).await
721    }
722
723    async fn active_tool_manifests(&self) -> Result<Vec<ToolManifest>> {
724        Ok(self.tool_state().await?.tool_manifests())
725    }
726
727    async fn add_tool_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
728        let tool_registry = self.tool_registry().await?;
729        let handle = tool_registry
730            .add_tool_provider(provider)
731            .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
732        self.refresh_tool_catalog().await?;
733        Ok(handle)
734    }
735
736    async fn remove_tool_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
737        let tool_registry = self.tool_registry().await?;
738        let generation = tool_registry
739            .remove_source(handle)
740            .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
741        self.refresh_tool_catalog().await?;
742        Ok(generation)
743    }
744
745    async fn create_child_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
746        let writer = self.runtime.writer();
747        let runtime = writer.lock().await;
748        let lifecycle = runtime.session_lifecycle_service()?;
749        lifecycle.create_session(request).await.map_err(Into::into)
750    }
751
752    async fn close_child_session(&self, session_id: &str) -> Result<()> {
753        let writer = self.runtime.writer();
754        let runtime = writer.lock().await;
755        let lifecycle = runtime.session_lifecycle_service()?;
756        lifecycle
757            .close_session(session_id)
758            .await
759            .map_err(Into::into)
760    }
761
762    async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
763        self.with_writer(async |runtime: &mut LashRuntime| {
764            runtime
765                .activate_managed_session(session_id)
766                .await
767                .map_err(Into::into)
768        })
769        .await
770    }
771
772    async fn inject_turn_input(
773        &self,
774        turn_id: &str,
775        id: Option<String>,
776        message: PluginMessage,
777    ) -> Result<()> {
778        self.inject_turn_inputs_for_turn(
779            turn_id,
780            vec![lash_core::InjectedTurnInput { id, message }],
781        )
782        .await
783    }
784
785    async fn inject_turn_inputs_for_turn(
786        &self,
787        turn_id: &str,
788        messages: Vec<lash_core::InjectedTurnInput>,
789    ) -> Result<()> {
790        for input in messages {
791            let source_key = input.id.map(|id| format!("injection:{id}"));
792            let turn_input = turn_input_from_plugin_message(input.message);
793            self.runtime
794                .enqueue_turn_input(
795                    turn_input,
796                    lash_core::TurnInputIngress::active_turn(
797                        turn_id,
798                        lash_core::TurnInputCheckpointBoundary::AfterWork,
799                    ),
800                    source_key,
801                )
802                .await
803                .map(|_| ())
804                .map_err(EmbedError::Runtime)?;
805        }
806        Ok(())
807    }
808
809    async fn tool_registry(&self) -> Result<Arc<lash_core::ToolRegistry>> {
810        self.runtime
811            .writer()
812            .lock()
813            .await
814            .plugin_session()
815            .map(|session| session.tool_registry())
816            .ok_or_else(|| {
817                EmbedError::Session(SessionError::Protocol(
818                    "tool registry is unavailable in this runtime session".to_string(),
819                ))
820            })
821    }
822}
823
824fn turn_input_from_plugin_message(message: PluginMessage) -> TurnInput {
825    let mut input = TurnInput::empty();
826    if !message.content.is_empty() {
827        input.items.push(InputItem::Text {
828            text: message.content,
829        });
830    }
831    for (index, bytes) in message.images.into_iter().enumerate() {
832        let id = format!("injected-image-{index}");
833        input.items.push(InputItem::ImageRef { id: id.clone() });
834        input.image_blobs.insert(id, bytes);
835    }
836    input
837}
838
839#[derive(Clone)]
840pub struct SessionConfigAdmin {
841    control: SessionAdmin,
842}
843
844impl SessionConfigAdmin {
845    pub async fn update(&self, patch: SessionConfigPatch) -> Result<()> {
846        self.control.update_config(patch).await
847    }
848
849    pub async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
850        self.control.set_prompt_template(template).await
851    }
852
853    pub async fn clear_prompt_template(&self) -> Result<()> {
854        self.control.clear_prompt_template().await
855    }
856
857    pub async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
858        self.control.add_prompt_contribution(contribution).await
859    }
860
861    pub async fn replace_prompt_slot(
862        &self,
863        slot: PromptSlot,
864        contributions: impl IntoIterator<Item = PromptContribution>,
865    ) -> Result<()> {
866        self.control.replace_prompt_slot(slot, contributions).await
867    }
868
869    pub async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
870        self.control.clear_prompt_slot(slot).await
871    }
872}
873
874#[derive(Clone)]
875pub struct ToolAdmin {
876    control: SessionAdmin,
877}
878
879impl ToolAdmin {
880    pub(crate) fn new(control: SessionAdmin) -> Self {
881        Self { control }
882    }
883}
884
885impl ToolAdmin {
886    pub async fn state(&self) -> Result<ToolState> {
887        self.control.tool_state().await
888    }
889
890    pub fn advanced(&self) -> AdvancedToolAdmin {
891        AdvancedToolAdmin {
892            control: self.control.clone(),
893        }
894    }
895
896    /// Toggle Tool Catalog membership for a tool. `present` adds it as a
897    /// member; `!present` removes it. Membership is the execution gate.
898    pub async fn set_membership(
899        &self,
900        tool_id: impl Into<lash_core::ToolId>,
901        present: bool,
902    ) -> Result<u64> {
903        self.control
904            .set_tool_membership(tool_id.into(), present)
905            .await
906    }
907
908    pub async fn set_membership_many(&self, updates: &[(lash_core::ToolId, bool)]) -> Result<u64> {
909        self.control.set_tool_membership_many(updates).await
910    }
911
912    pub async fn active_manifests(&self) -> Result<Vec<ToolManifest>> {
913        self.control.active_tool_manifests().await
914    }
915
916    pub async fn add_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
917        self.control.add_tool_provider(provider).await
918    }
919
920    pub async fn remove_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
921        self.control.remove_tool_source(handle).await
922    }
923}
924
925#[derive(Clone)]
926pub struct AdvancedToolAdmin {
927    control: SessionAdmin,
928}
929
930impl AdvancedToolAdmin {
931    /// Replace the entire tool-state snapshot.
932    ///
933    /// This is a generation-checked escape hatch for hosts that intentionally
934    /// edit the full snapshot. Prefer `ToolAdmin` membership methods for
935    /// ordinary tool policy changes.
936    pub async fn apply_state(&self, state: ToolState) -> Result<u64> {
937        self.control.apply_tool_state(state).await
938    }
939
940    /// Restore a persisted tool-state snapshot, adopting its generation.
941    ///
942    /// Use this when re-applying a snapshot read from durable storage (session
943    /// resume), not an edited delta: it reconstructs the exact persisted surface
944    /// idempotently rather than requiring the snapshot to match the current
945    /// generation. A cold resume of a session whose surface reached generation
946    /// ≥ 2 needs this — [`apply_state`](Self::apply_state) would reject it.
947    ///
948    /// Persisted tools whose source is not currently registered (e.g. a
949    /// detached MCP server) do not fail the restore: they are kept as orphaned
950    /// non-members, listed in the returned [`ToolRestoreReport`], and rebind
951    /// automatically when a source re-advertises the same tool.
952    pub async fn restore_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
953        self.control.restore_tool_state(state).await
954    }
955}
956
957#[derive(Clone)]
958pub struct SessionCommandAdmin {
959    control: SessionAdmin,
960}
961
962impl SessionCommandAdmin {
963    /// Enqueue an unconditional tool-catalog refresh. The command drains
964    /// asynchronously and recomputes the surface from live sources, so it
965    /// takes no generation guard — any generation observed at enqueue time
966    /// could legitimately have advanced by drain time.
967    pub async fn refresh_tool_catalog(
968        &self,
969        reason: impl Into<String>,
970        idempotency_key: impl Into<String>,
971    ) -> Result<lash_core::SessionCommandReceipt> {
972        self.control
973            .submit_session_command(
974                lash_core::SessionCommand::RefreshToolCatalog {
975                    reason: reason.into(),
976                },
977                idempotency_key,
978            )
979            .await
980    }
981
982    pub async fn reset(
983        &self,
984        reason: impl Into<String>,
985        idempotency_key: impl Into<String>,
986    ) -> Result<lash_core::SessionCommandReceipt> {
987        self.control
988            .submit_session_command(
989                lash_core::SessionCommand::ResetSession {
990                    reason: reason.into(),
991                },
992                idempotency_key,
993            )
994            .await
995    }
996}
997
998/// Session-scoped read controls for Lashlang trigger registrations.
999#[derive(Clone)]
1000pub struct SessionTriggerAdmin {
1001    control: SessionAdmin,
1002}
1003
1004impl SessionTriggerAdmin {
1005    /// Return every trigger registration in the session.
1006    ///
1007    /// This is an admin/introspection view. Source owners should prefer
1008    /// [`Self::by_source_type`] so they only inspect registrations for the
1009    /// concrete source type they own.
1010    pub async fn list_all(&self) -> Result<Vec<lash_core::TriggerRegistration>> {
1011        self.control.list_trigger_registrations().await
1012    }
1013
1014    /// Return registrations whose source value has the given host descriptor type.
1015    ///
1016    /// This is the source-owner API: a timer, UI, webhook, or other host-owned
1017    /// source uses it to inspect registrations for keys it may schedule and emit.
1018    pub async fn by_source_type(
1019        &self,
1020        source_type: impl Into<lash_core::TriggerEventType>,
1021    ) -> Result<Vec<lash_core::TriggerRegistration>> {
1022        self.control
1023            .trigger_registrations_by_source_type(source_type)
1024            .await
1025    }
1026}
1027
1028#[derive(Clone)]
1029pub struct SessionProcessAdmin {
1030    control: SessionAdmin,
1031}
1032
1033/// Session-scoped view of the global process surface ([`Processes`]).
1034///
1035/// This is thin sugar, not a parallel surface (ADR 0019 grill): every read is
1036/// the global observer pre-filtered by this session's **grant** scope (what the
1037/// session may address), and every mutation delegates to the same runtime
1038/// process path the global surface uses. It speaks the same [`ObservedProcess`]
1039/// vocabulary as [`Processes`]; [`start`](Self::start) returns the
1040/// grant-entry handle row ([`lash_core::ProcessHandleSummary`]), the one row
1041/// type retained for the model/handle contract.
1042impl SessionProcessAdmin {
1043    pub(crate) fn new(control: SessionAdmin) -> Self {
1044        Self { control }
1045    }
1046
1047    /// Grant-scoped read: the global observer filtered to every scope this
1048    /// session may address (root + current frame), deduplicated. One home for
1049    /// the session's read logic — it calls the observer, never reimplements it.
1050    async fn list_granted(
1051        &self,
1052        filter: &lash_core::ProcessListFilter,
1053    ) -> Result<Vec<lash_core::ObservedProcess>> {
1054        // A registry-less runtime has no processes to address; observe empty
1055        // rather than erroring, matching the pre-unification session read.
1056        let Some(observer) = self.control.process_observer_opt() else {
1057            return Ok(Vec::new());
1058        };
1059        let mut seen = std::collections::BTreeSet::new();
1060        let mut out = Vec::new();
1061        for scope in self.control.process_visible_scopes() {
1062            for process in observer.list_granted_to(&scope, filter).await? {
1063                if seen.insert(process.process_id.clone()) {
1064                    out.push(process);
1065                }
1066            }
1067        }
1068        Ok(out)
1069    }
1070
1071    pub async fn start(
1072        &self,
1073        request: lash_core::ProcessStartRequest,
1074        scoped_effect_controller: ScopedEffectController<'_>,
1075    ) -> Result<lash_core::ProcessHandleSummary> {
1076        self.control
1077            .start_process(request, scoped_effect_controller)
1078            .await
1079    }
1080
1081    /// Running processes this session may address.
1082    pub async fn list(&self) -> Result<Vec<lash_core::ObservedProcess>> {
1083        self.list_granted(&lash_core::ProcessListFilter {
1084            status: lash_core::ProcessStatusFilter::Running,
1085            ..lash_core::ProcessListFilter::default()
1086        })
1087        .await
1088    }
1089
1090    /// Every process (any status) this session may address.
1091    pub async fn list_all(&self) -> Result<Vec<lash_core::ObservedProcess>> {
1092        self.list_granted(&lash_core::ProcessListFilter {
1093            status: lash_core::ProcessStatusFilter::Any,
1094            ..lash_core::ProcessListFilter::default()
1095        })
1096        .await
1097    }
1098
1099    /// One process this session may address, if present.
1100    pub async fn get(&self, process_id: &str) -> Result<Option<lash_core::ObservedProcess>> {
1101        Ok(self
1102            .list_all()
1103            .await?
1104            .into_iter()
1105            .find(|process| process.process_id == process_id))
1106    }
1107
1108    pub async fn events(
1109        &self,
1110        process_id: &str,
1111        after_sequence: u64,
1112    ) -> Result<Vec<lash_core::ObservedProcessEvent>> {
1113        let Some(observer) = self.control.process_observer_opt() else {
1114            return Ok(Vec::new());
1115        };
1116        observer
1117            .events_after(process_id, after_sequence)
1118            .await
1119            .map_err(Into::into)
1120    }
1121
1122    pub async fn await_output(&self, process_id: &str) -> Result<lash_core::ProcessAwaitOutput> {
1123        self.control.await_process_output(process_id).await
1124    }
1125
1126    pub async fn signal(
1127        &self,
1128        process_id: &str,
1129        signal_name: impl Into<String>,
1130        signal_id: impl Into<String>,
1131        payload: serde_json::Value,
1132        scoped_effect_controller: ScopedEffectController<'_>,
1133    ) -> Result<lash_core::ProcessEvent> {
1134        self.control
1135            .signal_process(
1136                process_id,
1137                signal_name.into(),
1138                signal_id.into(),
1139                payload,
1140                scoped_effect_controller,
1141            )
1142            .await
1143    }
1144
1145    pub async fn cancel(
1146        &self,
1147        process_id: &str,
1148        scoped_effect_controller: ScopedEffectController<'_>,
1149    ) -> Result<lash_core::ProcessCancelSummary> {
1150        self.control
1151            .cancel_process(process_id, scoped_effect_controller)
1152            .await
1153    }
1154
1155    pub async fn cancel_all(
1156        &self,
1157        scoped_effect_controller: ScopedEffectController<'_>,
1158    ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
1159        self.control
1160            .cancel_visible_processes(scoped_effect_controller)
1161            .await
1162    }
1163
1164    /// Move this session's handle grants for `process_ids` to another session.
1165    /// Re-homes the addressing grant only; the process itself is global.
1166    pub async fn transfer(
1167        &self,
1168        to_session_id: &str,
1169        process_ids: Vec<String>,
1170        scoped_effect_controller: ScopedEffectController<'_>,
1171    ) -> Result<()> {
1172        self.control
1173            .transfer_process_handles(to_session_id, process_ids, scoped_effect_controller)
1174            .await
1175    }
1176
1177    /// Record an Abandon Request (ADR 0019) against a process this session
1178    /// owns: an authorization the recovery sweep reconciles into `Abandoned`
1179    /// once the owner's lease lapses. Returns the process as observed after.
1180    pub async fn request_abandon(
1181        &self,
1182        process_id: &str,
1183        reason: Option<String>,
1184    ) -> Result<lash_core::ObservedProcess> {
1185        self.control
1186            .request_process_abandon(process_id, reason)
1187            .await
1188    }
1189}
1190
1191#[derive(Clone)]
1192pub struct SessionStateAdmin {
1193    control: SessionAdmin,
1194}
1195
1196impl SessionStateAdmin {
1197    pub async fn export(&self) -> lash_core::SessionSnapshot {
1198        self.control.export_state().await
1199    }
1200
1201    pub async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
1202        self.control.append_messages(messages).await
1203    }
1204
1205    pub async fn append_plugin_body(
1206        &self,
1207        plugin_type: impl Into<String>,
1208        body: serde_json::Value,
1209    ) -> Result<()> {
1210        self.control.append_plugin_body(plugin_type, body).await
1211    }
1212
1213    pub async fn set_persisted(&self, state: RuntimeSessionState) -> Result<()> {
1214        self.control.set_persisted_state(state).await
1215    }
1216
1217    pub async fn branch_to_node(
1218        &self,
1219        target_leaf: Option<String>,
1220    ) -> Result<lash_core::SessionSnapshot> {
1221        self.control.branch_to_node(target_leaf).await
1222    }
1223
1224    pub async fn persist_current(&self) -> Result<RuntimeSessionState> {
1225        self.control.persist_current_state().await
1226    }
1227
1228    pub async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
1229        self.control.session_state_service().await
1230    }
1231
1232    pub async fn snapshot_execution(&self) -> Result<Option<Vec<u8>>> {
1233        self.control.snapshot_execution_state().await
1234    }
1235
1236    pub async fn restore_execution(&self, bytes: &[u8]) -> Result<()> {
1237        self.control.restore_execution_state(bytes).await
1238    }
1239
1240    pub async fn compact_context(
1241        &self,
1242        instructions: Option<String>,
1243        scoped_effect_controller: ScopedEffectController<'_>,
1244    ) -> Result<bool> {
1245        self.control
1246            .compact_context(instructions, scoped_effect_controller)
1247            .await
1248    }
1249}
1250
1251#[derive(Clone)]
1252pub struct PluginOperations {
1253    pub(crate) control: SessionAdmin,
1254}
1255
1256impl PluginOperations {
1257    pub async fn query<Op: lash_core::PluginQuery>(&self, args: Op::Args) -> Result<Op::Output> {
1258        let (_plugin_id, output) = self
1259            .control
1260            .query_plugin_raw(Op::NAME, encode_plugin_args::<Op>(args)?)
1261            .await?;
1262        decode_plugin_output::<Op>(output)
1263    }
1264
1265    pub async fn query_raw(
1266        &self,
1267        name: &str,
1268        args: serde_json::Value,
1269    ) -> Result<(String, serde_json::Value)> {
1270        self.control.query_plugin_raw(name, args).await
1271    }
1272
1273    pub async fn run_command<Op: lash_core::PluginCommand>(
1274        &self,
1275        args: Op::Args,
1276    ) -> Result<lash_core::PluginCommandReceipt<Op::Output>> {
1277        let receipt = self
1278            .control
1279            .run_plugin_command_raw(Op::NAME, encode_plugin_args::<Op>(args)?)
1280            .await?;
1281        Ok(lash_core::PluginCommandReceipt {
1282            output: decode_plugin_output::<Op>(receipt.output)?,
1283            events: receipt.events,
1284            pending_turn_inputs: receipt.pending_turn_inputs,
1285        })
1286    }
1287
1288    pub async fn run_command_raw(
1289        &self,
1290        name: &str,
1291        args: serde_json::Value,
1292    ) -> Result<lash_core::PluginCommandReceipt<serde_json::Value>> {
1293        self.control.run_plugin_command_raw(name, args).await
1294    }
1295
1296    pub async fn run_task<Op: lash_core::PluginTask>(
1297        &self,
1298        args: Op::Args,
1299    ) -> Result<lash_core::PluginTaskReceipt<Op::Output>> {
1300        self.run_task_with_cancel::<Op>(args, CancellationToken::new())
1301            .await
1302    }
1303
1304    pub async fn run_task_with_cancel<Op: lash_core::PluginTask>(
1305        &self,
1306        args: Op::Args,
1307        cancellation_token: CancellationToken,
1308    ) -> Result<lash_core::PluginTaskReceipt<Op::Output>> {
1309        let receipt = self
1310            .control
1311            .run_plugin_task_raw_with_cancel(
1312                Op::NAME,
1313                encode_plugin_args::<Op>(args)?,
1314                cancellation_token,
1315            )
1316            .await?;
1317        Ok(lash_core::PluginTaskReceipt {
1318            output: decode_plugin_output::<Op>(receipt.output)?,
1319            events: receipt.events,
1320            pending_turn_inputs: receipt.pending_turn_inputs,
1321        })
1322    }
1323
1324    pub async fn run_task_raw(
1325        &self,
1326        name: &str,
1327        args: serde_json::Value,
1328    ) -> Result<lash_core::PluginTaskReceipt<serde_json::Value>> {
1329        self.run_task_raw_with_cancel(name, args, CancellationToken::new())
1330            .await
1331    }
1332
1333    pub async fn run_task_raw_with_cancel(
1334        &self,
1335        name: &str,
1336        args: serde_json::Value,
1337        cancellation_token: CancellationToken,
1338    ) -> Result<lash_core::PluginTaskReceipt<serde_json::Value>> {
1339        self.control
1340            .run_plugin_task_raw_with_cancel(name, args, cancellation_token)
1341            .await
1342    }
1343}
1344
1345fn encode_plugin_args<Op: lash_core::PluginOperation>(args: Op::Args) -> Result<serde_json::Value> {
1346    serde_json::to_value(args).map_err(|err| {
1347        EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
1348            "invalid {} args: {err}",
1349            Op::NAME
1350        )))
1351    })
1352}
1353
1354fn decode_plugin_output<Op: lash_core::PluginOperation>(
1355    output: serde_json::Value,
1356) -> Result<Op::Output> {
1357    serde_json::from_value(output).map_err(|err| {
1358        EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
1359            "invalid {} output: {err}",
1360            Op::NAME
1361        )))
1362    })
1363}
1364
1365#[derive(Clone)]
1366pub struct ChildSessionAdmin {
1367    control: SessionAdmin,
1368}
1369
1370impl ChildSessionAdmin {
1371    pub async fn create_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
1372        self.control.create_child_session(request).await
1373    }
1374
1375    pub async fn close_session(&self, session_id: &str) -> Result<()> {
1376        self.control.close_child_session(session_id).await
1377    }
1378
1379    pub async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
1380        self.control.activate_managed_session(session_id).await
1381    }
1382}
1383
1384#[derive(Clone)]
1385pub struct InjectionAdmin {
1386    control: SessionAdmin,
1387}
1388
1389impl InjectionAdmin {
1390    pub async fn inject_turn_input(
1391        &self,
1392        turn_id: &str,
1393        id: Option<String>,
1394        message: PluginMessage,
1395    ) -> Result<()> {
1396        self.control.inject_turn_input(turn_id, id, message).await
1397    }
1398
1399    pub async fn inject_turn_inputs_for_turn(
1400        &self,
1401        turn_id: &str,
1402        messages: Vec<lash_core::InjectedTurnInput>,
1403    ) -> Result<()> {
1404        self.control
1405            .inject_turn_inputs_for_turn(turn_id, messages)
1406            .await
1407    }
1408}
1409
1410#[derive(Clone)]
1411pub struct ProtocolAdmin {
1412    control: SessionAdmin,
1413}
1414
1415impl ProtocolAdmin {
1416    pub async fn apply_session_extension(
1417        &self,
1418        extension: lash_core::ProtocolSessionExtensionHandle,
1419    ) -> Result<()> {
1420        self.control
1421            .apply_protocol_session_extension(extension)
1422            .await
1423    }
1424}