Skip to main content

lash/
admin.rs

1pub use crate::session::SessionConfigPatch;
2use crate::support::*;
3pub use lash_core::{AcceptedInjectedTurnInput, PluginAction};
4
5#[derive(Clone)]
6pub struct Completions {
7    pub(crate) core: LashCore,
8}
9
10impl Completions {
11    pub async fn resolve(
12        &self,
13        key: lash_core::AwaitEventKey,
14        resolution: lash_core::Resolution,
15    ) -> Result<lash_core::ResolveOutcome> {
16        self.core
17            .env
18            .core
19            .control
20            .effect_host
21            .resolve_await_event(&key, resolution)
22            .await
23            .map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))
24    }
25}
26
27#[derive(Clone)]
28pub struct CoreTriggerAdmin {
29    pub(crate) core: LashCore,
30}
31
32impl CoreTriggerAdmin {
33    pub async fn emit(
34        &self,
35        request: lash_core::TriggerOccurrenceRequest,
36        scoped_effect_controller: ScopedEffectController<'_>,
37    ) -> Result<lash_core::TriggerEmitReport> {
38        let store = self.core.env.trigger_store.as_ref().ok_or_else(|| {
39            EmbedError::Plugin(lash_core::PluginError::Session(
40                "trigger store is unavailable in this runtime".to_string(),
41            ))
42        })?;
43        let drivers = self.core.work_driver.drivers().await;
44        let router = lash_core::TriggerRouter::new(
45            Arc::clone(store),
46            self.core.env.process_registry.clone(),
47            drivers.process,
48        );
49        router
50            .emit(request, scoped_effect_controller.controller())
51            .await
52            .map_err(Into::into)
53    }
54
55    pub async fn subscriptions(
56        &self,
57        filter: lash_core::TriggerSubscriptionFilter,
58    ) -> Result<Vec<lash_core::TriggerRegistration>> {
59        let store = self.core.env.trigger_store.as_ref().ok_or_else(|| {
60            EmbedError::Plugin(lash_core::PluginError::Session(
61                "trigger store is unavailable in this runtime".to_string(),
62            ))
63        })?;
64        let records = store.list_subscriptions(filter).await?;
65        Ok(records
66            .iter()
67            .map(lash_core::TriggerRegistration::from)
68            .collect())
69    }
70}
71
72#[derive(Clone)]
73pub struct Processes {
74    pub(crate) core: LashCore,
75}
76
77impl Processes {
78    fn registry(&self) -> Result<Arc<dyn lash_core::ProcessRegistry>> {
79        self.core
80            .env
81            .process_registry
82            .as_ref()
83            .cloned()
84            .ok_or_else(|| {
85                EmbedError::Plugin(lash_core::PluginError::Session(
86                    "process registry is unavailable in this runtime".to_string(),
87                ))
88            })
89    }
90
91    fn make_observer(&self) -> Result<lash_core::ProcessWorkObserver> {
92        Ok(lash_core::ProcessWorkObserver::new(self.registry()?))
93    }
94
95    fn process_invocation(command: &lash_core::ProcessCommand) -> lash_core::RuntimeInvocation {
96        let effect_id = command.effect_id();
97        lash_core::RuntimeInvocation::effect(
98            lash_core::runtime::RuntimeScope::new("runtime"),
99            effect_id.clone(),
100            lash_core::RuntimeEffectKind::Process,
101            effect_id,
102        )
103    }
104
105    async fn run_command(
106        &self,
107        command: lash_core::ProcessCommand,
108        scoped_effect_controller: ScopedEffectController<'_>,
109    ) -> Result<lash_core::ProcessEffectOutcome> {
110        let registry = self.registry()?;
111        let invocation = Self::process_invocation(&command);
112        let outcome = scoped_effect_controller
113            .controller()
114            .execute_effect(
115                lash_core::RuntimeEffectEnvelope::new(
116                    invocation,
117                    lash_core::RuntimeEffectCommand::process(command),
118                ),
119                lash_core::RuntimeEffectLocalExecutor::processes(registry),
120            )
121            .await
122            .map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))?;
123        match outcome {
124            lash_core::RuntimeEffectOutcome::Process { result } => Ok(result),
125            _ => Err(EmbedError::Plugin(lash_core::PluginError::Session(
126                "process effect returned non-process outcome".to_string(),
127            ))),
128        }
129    }
130
131    pub async fn start(
132        &self,
133        request: lash_core::ProcessStartRequest,
134        scoped_effect_controller: ScopedEffectController<'_>,
135    ) -> Result<lash_core::ProcessRecord> {
136        let env_ref = match request.env_spec.as_ref() {
137            Some(env_spec) => Some(
138                lash_core::runtime::persist_process_execution_env(
139                    self.core.env.core.durability.process_env_store.as_ref(),
140                    env_spec,
141                )
142                .await?,
143            ),
144            None => None,
145        };
146        let grant = request.grant.clone();
147        let registration = request.into_registration(env_ref);
148        let command = lash_core::ProcessCommand::Start {
149            registration,
150            grant,
151            execution_context: Box::new(lash_core::ProcessExecutionContext::default()),
152        };
153        let outcome = self
154            .run_command(command, scoped_effect_controller.clone())
155            .await?;
156        let lash_core::ProcessEffectOutcome::Start { record } = outcome else {
157            return Err(EmbedError::Plugin(lash_core::PluginError::Session(
158                "process start returned the wrong outcome".to_string(),
159            )));
160        };
161        if let Some(driver) = self.core.work_driver.drivers().await.process {
162            driver.claim_and_run_pending("admin_process_start").await?;
163        }
164        Ok(record)
165    }
166
167    pub async fn list(
168        &self,
169        filter: &lash_core::ProcessListFilter,
170    ) -> Result<Vec<lash_core::ObservedProcess>> {
171        self.make_observer()?.list(filter).await.map_err(Into::into)
172    }
173
174    pub async fn get(&self, process_id: &str) -> Result<Option<lash_core::ObservedProcess>> {
175        Ok(self.make_observer()?.process(process_id).await)
176    }
177
178    pub async fn events(
179        &self,
180        process_id: &str,
181        after_sequence: u64,
182    ) -> Result<Vec<lash_core::ObservedProcessEvent>> {
183        self.make_observer()?
184            .events_after(process_id, after_sequence)
185            .await
186            .map_err(Into::into)
187    }
188
189    pub async fn await_output(&self, process_id: &str) -> Result<lash_core::ProcessAwaitOutput> {
190        self.registry()?
191            .await_process(process_id)
192            .await
193            .map_err(Into::into)
194    }
195
196    pub async fn cancel(
197        &self,
198        process_id: &str,
199        scoped_effect_controller: ScopedEffectController<'_>,
200    ) -> Result<lash_core::ProcessCancelSummary> {
201        let command = lash_core::ProcessCommand::Cancel {
202            process_id: process_id.to_string(),
203            reason: Some("requested by host".to_string()),
204        };
205        let outcome = self
206            .run_command(command, scoped_effect_controller.clone())
207            .await?;
208        let lash_core::ProcessEffectOutcome::Cancel { record } = outcome else {
209            return Err(EmbedError::Plugin(lash_core::PluginError::Session(
210                "process cancel returned the wrong outcome".to_string(),
211            )));
212        };
213        Ok(lash_core::ProcessCancelSummary::from_record(record))
214    }
215
216    pub async fn signal(
217        &self,
218        process_id: &str,
219        signal_name: impl Into<String>,
220        signal_id: impl Into<String>,
221        request: lash_core::ProcessEventAppendRequest,
222        scoped_effect_controller: ScopedEffectController<'_>,
223    ) -> Result<lash_core::ProcessEvent> {
224        let signal_name = signal_name.into();
225        let event_type = request.event_type.clone();
226        let payload = request.payload.clone();
227        let command = lash_core::ProcessCommand::Signal {
228            process_id: process_id.to_string(),
229            signal_name: signal_name.clone(),
230            signal_id: signal_id.into(),
231            request,
232        };
233        let outcome = self
234            .run_command(command, scoped_effect_controller.clone())
235            .await?;
236        let lash_core::ProcessEffectOutcome::Signal { event } = outcome else {
237            return Err(EmbedError::Plugin(lash_core::PluginError::Session(
238                "process signal returned the wrong outcome".to_string(),
239            )));
240        };
241        let registry = self.registry()?;
242        let waiting_ordinal =
243            registry
244                .get_process(process_id)
245                .await
246                .and_then(|record| match record.wait {
247                    Some(lash_core::WaitState {
248                        kind:
249                            lash_core::WaitKind::Signal {
250                                name,
251                                event_type: wait_event_type,
252                                ordinal,
253                                ..
254                            },
255                        ..
256                    }) if name == signal_name && wait_event_type == event_type => Some(ordinal),
257                    _ => None,
258                });
259        let ordinal = match waiting_ordinal {
260            Some(ordinal) => ordinal,
261            None => {
262                registry
263                    .count_events_through(process_id, &event_type, event.sequence)
264                    .await?
265            }
266        };
267        if ordinal > 0 {
268            let key = scoped_effect_controller
269                .controller()
270                .await_event_key(
271                    &lash_core::ExecutionScope::process(process_id),
272                    lash_core::AwaitEventWaitIdentity::process_signal(
273                        process_id,
274                        &signal_name,
275                        ordinal,
276                    ),
277                )
278                .await
279                .map_err(|err| {
280                    EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
281                })?;
282            let _ = scoped_effect_controller
283                .controller()
284                .resolve_await_event(&key, lash_core::Resolution::Ok(payload))
285                .await
286                .map_err(|err| {
287                    EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
288                })?;
289        }
290        Ok(event)
291    }
292
293    pub async fn session_snapshot(
294        &self,
295        session_id: impl Into<String>,
296    ) -> Result<lash_core::ProcessWorkSnapshot> {
297        self.make_observer()?
298            .snapshot_for_session(session_id)
299            .await
300            .map_err(Into::into)
301    }
302
303    pub fn observer(&self) -> Result<lash_core::ProcessWorkObserver> {
304        self.make_observer()
305    }
306}
307
308#[derive(Clone)]
309pub struct SessionAdmin {
310    pub(crate) runtime: RuntimeHandle,
311}
312
313impl SessionAdmin {
314    pub fn config(&self) -> SessionConfigAdmin {
315        SessionConfigAdmin {
316            control: self.clone(),
317        }
318    }
319
320    pub fn tools(&self) -> ToolAdmin {
321        ToolAdmin {
322            control: self.clone(),
323        }
324    }
325
326    pub fn commands(&self) -> SessionCommandAdmin {
327        SessionCommandAdmin {
328            control: self.clone(),
329        }
330    }
331
332    pub fn triggers(&self) -> SessionTriggerAdmin {
333        SessionTriggerAdmin {
334            control: self.clone(),
335        }
336    }
337
338    pub fn state(&self) -> SessionStateAdmin {
339        SessionStateAdmin {
340            control: self.clone(),
341        }
342    }
343
344    pub fn children(&self) -> ChildSessionAdmin {
345        ChildSessionAdmin {
346            control: self.clone(),
347        }
348    }
349
350    pub fn injection(&self) -> InjectionAdmin {
351        InjectionAdmin {
352            control: self.clone(),
353        }
354    }
355
356    pub fn protocol(&self) -> ProtocolAdmin {
357        ProtocolAdmin {
358            control: self.clone(),
359        }
360    }
361
362    pub fn processes(&self) -> SessionProcessAdmin {
363        SessionProcessAdmin {
364            control: self.clone(),
365        }
366    }
367
368    /// Run `f` against the locked runtime writer, then publish the resulting
369    /// observation. The body is the canonical `lock → call → publish_from`
370    /// stamp shared by nearly every mutating control method; publish happens
371    /// unconditionally once the closure returns.
372    async fn with_writer<F, T>(&self, f: F) -> T
373    where
374        F: AsyncFnOnce(&mut LashRuntime) -> T,
375    {
376        let writer = self.runtime.writer();
377        let mut runtime = writer.lock().await;
378        let value = f(&mut runtime).await;
379        self.runtime.publish_from(&runtime);
380        value
381    }
382
383    async fn update_config(&self, patch: SessionConfigPatch) -> Result<()> {
384        self.update_session_config(patch.provider, patch.model, patch.prompt)
385            .await?;
386        Ok(())
387    }
388
389    async fn update_session_config(
390        &self,
391        provider: Option<ProviderHandle>,
392        model: Option<lash_core::ModelSpec>,
393        prompt: Option<PromptLayer>,
394    ) -> Result<()> {
395        self.with_writer(async |runtime: &mut LashRuntime| {
396            runtime.update_session_config(provider, model, prompt).await;
397        })
398        .await;
399        Ok(())
400    }
401
402    async fn export_state(&self) -> lash_core::SessionSnapshot {
403        self.runtime.observe().read_view.to_snapshot()
404    }
405
406    async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
407        self.with_writer(async |runtime: &mut LashRuntime| {
408            runtime
409                .append_session_nodes(lash_core::AppendSessionNodesRequest {
410                    nodes: messages
411                        .into_iter()
412                        .map(lash_core::SessionAppendNode::message)
413                        .collect(),
414                    requires_ancestor_node_id: None,
415                })
416                .await
417                .map(|_| ())
418                .map_err(Into::into)
419        })
420        .await
421    }
422
423    async fn append_plugin_body(
424        &self,
425        plugin_type: impl Into<String>,
426        body: serde_json::Value,
427    ) -> Result<()> {
428        self.with_writer(async |runtime: &mut LashRuntime| {
429            runtime
430                .append_session_nodes(lash_core::AppendSessionNodesRequest {
431                    nodes: vec![lash_core::SessionAppendNode::plugin(plugin_type, body)],
432                    requires_ancestor_node_id: None,
433                })
434                .await
435                .map(|_| ())
436                .map_err(Into::into)
437        })
438        .await
439    }
440
441    async fn set_persisted_state(&self, state: RuntimeSessionState) -> Result<()> {
442        self.with_writer(async |runtime: &mut LashRuntime| {
443            runtime.set_persisted_state(state).map_err(Into::into)
444        })
445        .await
446    }
447
448    async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
449        self.with_writer(async |runtime: &mut LashRuntime| {
450            runtime.set_prompt_template(template).await;
451        })
452        .await;
453        Ok(())
454    }
455
456    async fn clear_prompt_template(&self) -> Result<()> {
457        self.with_writer(async |runtime: &mut LashRuntime| {
458            runtime.clear_prompt_template().await;
459        })
460        .await;
461        Ok(())
462    }
463
464    async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
465        self.with_writer(async |runtime: &mut LashRuntime| {
466            runtime.add_prompt_contribution(contribution).await;
467        })
468        .await;
469        Ok(())
470    }
471
472    async fn replace_prompt_slot(
473        &self,
474        slot: PromptSlot,
475        contributions: impl IntoIterator<Item = PromptContribution>,
476    ) -> Result<()> {
477        self.with_writer(async |runtime: &mut LashRuntime| {
478            runtime.replace_prompt_slot(slot, contributions).await;
479        })
480        .await;
481        Ok(())
482    }
483
484    async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
485        self.with_writer(async |runtime: &mut LashRuntime| {
486            runtime.clear_prompt_slot(slot).await;
487        })
488        .await;
489        Ok(())
490    }
491
492    async fn apply_protocol_session_extension(
493        &self,
494        extension: lash_core::ProtocolSessionExtensionHandle,
495    ) -> Result<()> {
496        self.with_writer(async |runtime: &mut LashRuntime| {
497            runtime
498                .apply_protocol_session_extension(extension)
499                .await
500                .map_err(Into::into)
501        })
502        .await
503    }
504
505    async fn branch_to_node(
506        &self,
507        target_leaf: Option<String>,
508    ) -> Result<lash_core::SessionSnapshot> {
509        self.with_writer(async |runtime: &mut LashRuntime| {
510            runtime
511                .branch_to_node(target_leaf)
512                .await
513                .map_err(Into::into)
514        })
515        .await
516    }
517
518    async fn await_background_work(&self) -> Result<()> {
519        self.with_writer(async |runtime: &mut LashRuntime| {
520            runtime.await_background_work().await.map_err(Into::into)
521        })
522        .await
523    }
524
525    async fn refresh_tool_catalog(&self) -> Result<()> {
526        self.with_writer(async |runtime: &mut LashRuntime| {
527            runtime
528                .refresh_session_tool_catalog()
529                .await
530                .map_err(Into::into)
531        })
532        .await
533    }
534
535    async fn submit_session_command(
536        &self,
537        command: lash_core::SessionCommand,
538        idempotency_key: impl Into<String>,
539    ) -> Result<lash_core::SessionCommandReceipt> {
540        let idempotency_key = idempotency_key.into();
541        self.with_writer(async |runtime: &mut LashRuntime| {
542            runtime
543                .submit_session_command(command, idempotency_key)
544                .await
545                .map_err(Into::into)
546        })
547        .await
548    }
549
550    async fn list_trigger_registrations(&self) -> Result<Vec<lash_core::TriggerRegistration>> {
551        self.with_writer(async |runtime: &mut LashRuntime| {
552            runtime
553                .list_trigger_registrations()
554                .await
555                .map_err(Into::into)
556        })
557        .await
558    }
559
560    async fn trigger_registrations_by_source_type(
561        &self,
562        source_type: impl Into<lash_core::TriggerEventType>,
563    ) -> Result<Vec<lash_core::TriggerRegistration>> {
564        self.with_writer(async |runtime: &mut LashRuntime| {
565            runtime
566                .trigger_registrations_by_source_type(source_type)
567                .await
568                .map_err(Into::into)
569        })
570        .await
571    }
572
573    async fn invoke_plugin_action(
574        &self,
575        name: &str,
576        args: serde_json::Value,
577    ) -> Result<ToolResult> {
578        let session_id = self.runtime.observe().session_id().to_string();
579        let writer = self.runtime.writer();
580        writer
581            .lock()
582            .await
583            .invoke_plugin_action(name, args, Some(session_id))
584            .await
585            .map_err(Into::into)
586    }
587
588    async fn call_plugin_action<Op: lash_core::PluginAction>(
589        &self,
590        args: Op::Args,
591    ) -> Result<Op::Output> {
592        let result = self
593            .invoke_plugin_action(
594                Op::NAME,
595                serde_json::to_value(args).map_err(|err| {
596                    EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
597                        "invalid {} args: {err}",
598                        Op::NAME
599                    )))
600                })?,
601            )
602            .await?;
603        let Some(output) = result.as_done_output() else {
604            return Err(EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
605                "{} returned a pending result where completed output is required",
606                Op::NAME
607            ))));
608        };
609        if !output.is_success() {
610            return Err(EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
611                "{} failed: {}",
612                Op::NAME,
613                output.value_for_projection()
614            ))));
615        }
616        serde_json::from_value(output.value_for_projection()).map_err(|err| {
617            EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
618                "invalid {} output: {err}",
619                Op::NAME
620            )))
621        })
622    }
623
624    async fn compact_context(
625        &self,
626        instructions: Option<String>,
627        scoped_effect_controller: ScopedEffectController<'_>,
628    ) -> Result<bool> {
629        self.with_writer(async |runtime: &mut LashRuntime| {
630            runtime
631                .compact_context(instructions, scoped_effect_controller)
632                .await
633                .map_err(Into::into)
634        })
635        .await
636    }
637
638    async fn persist_current_state(&self) -> Result<RuntimeSessionState> {
639        self.with_writer(async |runtime: &mut LashRuntime| {
640            runtime.await_background_work().await?;
641            Ok(runtime.export_persisted_state())
642        })
643        .await
644    }
645
646    async fn list_process_handles(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
647        Ok(self.runtime.observe().list_process_handles().await)
648    }
649
650    async fn list_all_process_handles(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
651        Ok(self.runtime.observe().list_all_process_handles().await)
652    }
653
654    async fn start_process(
655        &self,
656        request: lash_core::ProcessStartRequest,
657        scoped_effect_controller: ScopedEffectController<'_>,
658    ) -> Result<lash_core::ProcessHandleSummary> {
659        let writer = self.runtime.writer();
660        let runtime = writer.lock().await;
661        let session_id = runtime.session_id().to_string();
662        let processes = runtime.process_service()?;
663        let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
664        let summary = processes
665            .start_from_request(&session_id, request, scope)
666            .await
667            .map_err(EmbedError::Plugin)?;
668        self.runtime.record_process_changed(
669            SessionProcessEventKind::Started,
670            vec![summary.process_id.clone()],
671        );
672        Ok(summary)
673    }
674
675    async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
676        self.runtime
677            .writer()
678            .lock()
679            .await
680            .session_state_service()
681            .map_err(Into::into)
682    }
683
684    async fn cancel_process(
685        &self,
686        process_id: &str,
687        scoped_effect_controller: ScopedEffectController<'_>,
688    ) -> Result<lash_core::ProcessCancelSummary> {
689        let writer = self.runtime.writer();
690        let runtime = writer.lock().await;
691        let session_id = runtime.session_id().to_string();
692        let processes = runtime.process_service()?;
693        let cancel_ability = runtime.process_cancel_ability();
694        let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
695        let summary = cancel_ability
696            .cancel_summary(
697                processes.as_ref(),
698                lash_core::ProcessCancelRequest::new(
699                    &session_id,
700                    process_id,
701                    scope,
702                    lash_core::ProcessCancelSource::HostApi,
703                )
704                .with_reason("requested by host API"),
705            )
706            .await
707            .map_err(EmbedError::Plugin)?;
708        self.runtime.record_process_changed(
709            SessionProcessEventKind::Cancelled,
710            vec![summary.process_id.clone()],
711        );
712        Ok(summary)
713    }
714
715    async fn cancel_visible_processes(
716        &self,
717        scoped_effect_controller: ScopedEffectController<'_>,
718    ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
719        let writer = self.runtime.writer();
720        let runtime = writer.lock().await;
721        let session_id = runtime.session_id().to_string();
722        let processes = runtime.process_service()?;
723        let cancel_ability = runtime.process_cancel_ability();
724        let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
725        let summaries = cancel_ability
726            .cancel_all_visible(
727                processes.as_ref(),
728                lash_core::ProcessCancelAllRequest::new(
729                    &session_id,
730                    scope,
731                    lash_core::ProcessCancelSource::HostApi,
732                )
733                .with_reason("requested by host API"),
734            )
735            .await
736            .map_err(EmbedError::Plugin)?;
737        self.runtime.record_process_changed(
738            SessionProcessEventKind::Cancelled,
739            summaries
740                .iter()
741                .map(|summary| summary.process_id.clone())
742                .collect(),
743        );
744        Ok(summaries)
745    }
746
747    async fn snapshot_execution_state(&self) -> Result<Option<Vec<u8>>> {
748        self.with_writer(async |runtime: &mut LashRuntime| {
749            runtime.snapshot_execution_state().await.map_err(Into::into)
750        })
751        .await
752    }
753
754    async fn restore_execution_state(&self, bytes: &[u8]) -> Result<()> {
755        self.with_writer(async |runtime: &mut LashRuntime| {
756            runtime
757                .restore_execution_state(bytes)
758                .await
759                .map_err(Into::into)
760        })
761        .await
762    }
763
764    async fn tool_state(&self) -> Result<ToolState> {
765        self.runtime.observe().tool_state.clone().ok_or_else(|| {
766            EmbedError::Session(SessionError::Protocol(
767                "runtime session not available".to_string(),
768            ))
769        })
770    }
771
772    async fn apply_tool_state(&self, state: ToolState) -> Result<u64> {
773        self.with_writer(async |runtime: &mut LashRuntime| {
774            runtime
775                .apply_tool_state(state)
776                .await
777                .map_err(EmbedError::from)
778        })
779        .await
780    }
781
782    async fn restore_tool_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
783        self.with_writer(async |runtime: &mut LashRuntime| {
784            runtime
785                .restore_tool_state(state)
786                .await
787                .map_err(EmbedError::from)
788        })
789        .await
790    }
791
792    async fn set_tool_availability(
793        &self,
794        tool_id: lash_core::ToolId,
795        availability: ToolAvailability,
796    ) -> Result<u64> {
797        self.set_tool_availability_many(&[(tool_id, availability)])
798            .await
799    }
800
801    async fn set_tool_availability_many(
802        &self,
803        updates: &[(lash_core::ToolId, ToolAvailability)],
804    ) -> Result<u64> {
805        let mut state = self.tool_state().await?;
806        for (tool_id, availability) in updates {
807            state
808                .set_availability(tool_id, Some(*availability))
809                .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
810        }
811        self.apply_tool_state(state).await
812    }
813
814    async fn clear_tool_availability_override(&self, tool_id: lash_core::ToolId) -> Result<u64> {
815        let mut state = self.tool_state().await?;
816        state
817            .set_availability(&tool_id, None)
818            .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
819        self.apply_tool_state(state).await
820    }
821
822    async fn active_tool_manifests(&self) -> Result<Vec<ToolManifest>> {
823        Ok(self.tool_state().await?.tool_manifests())
824    }
825
826    async fn add_tool_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
827        let tool_registry = self.tool_registry().await?;
828        let handle = tool_registry
829            .add_tool_provider(provider)
830            .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
831        self.refresh_tool_catalog().await?;
832        Ok(handle)
833    }
834
835    async fn remove_tool_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
836        let tool_registry = self.tool_registry().await?;
837        let generation = tool_registry
838            .remove_source(handle)
839            .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
840        self.refresh_tool_catalog().await?;
841        Ok(generation)
842    }
843
844    async fn create_child_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
845        let writer = self.runtime.writer();
846        let runtime = writer.lock().await;
847        let lifecycle = runtime.session_lifecycle_service()?;
848        lifecycle.create_session(request).await.map_err(Into::into)
849    }
850
851    async fn close_child_session(&self, session_id: &str) -> Result<()> {
852        let writer = self.runtime.writer();
853        let runtime = writer.lock().await;
854        let lifecycle = runtime.session_lifecycle_service()?;
855        lifecycle
856            .close_session(session_id)
857            .await
858            .map_err(Into::into)
859    }
860
861    async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
862        self.with_writer(async |runtime: &mut LashRuntime| {
863            runtime
864                .activate_managed_session(session_id)
865                .await
866                .map_err(Into::into)
867        })
868        .await
869    }
870
871    async fn inject_turn_input(&self, id: Option<String>, message: PluginMessage) -> Result<()> {
872        self.inject_turn_inputs(vec![lash_core::InjectedTurnInput { id, message }])
873            .await
874    }
875
876    async fn inject_turn_inputs(&self, messages: Vec<lash_core::InjectedTurnInput>) -> Result<()> {
877        for input in messages {
878            let source_key = input.id.map(|id| format!("injection:{id}"));
879            let turn_input = turn_input_from_plugin_message(input.message);
880            self.runtime
881                .enqueue_turn_input(
882                    turn_input,
883                    lash_core::DeliveryPolicy::EarliestSafeBoundary,
884                    lash_core::SlotPolicy::Join,
885                    source_key,
886                )
887                .await
888                .map(|_| ())
889                .map_err(EmbedError::Runtime)?;
890        }
891        Ok(())
892    }
893
894    async fn tool_registry(&self) -> Result<Arc<lash_core::ToolRegistry>> {
895        self.runtime
896            .writer()
897            .lock()
898            .await
899            .plugin_session()
900            .map(|session| session.tool_registry())
901            .ok_or_else(|| {
902                EmbedError::Session(SessionError::Protocol(
903                    "tool registry is unavailable in this runtime session".to_string(),
904                ))
905            })
906    }
907}
908
909fn turn_input_from_plugin_message(message: PluginMessage) -> TurnInput {
910    let mut input = TurnInput::empty();
911    if !message.content.is_empty() {
912        input.items.push(InputItem::Text {
913            text: message.content,
914        });
915    }
916    for (index, bytes) in message.images.into_iter().enumerate() {
917        let id = format!("injected-image-{index}");
918        input.items.push(InputItem::ImageRef { id: id.clone() });
919        input.image_blobs.insert(id, bytes);
920    }
921    input
922}
923
924#[derive(Clone)]
925pub struct SessionConfigAdmin {
926    control: SessionAdmin,
927}
928
929impl SessionConfigAdmin {
930    pub async fn update(&self, patch: SessionConfigPatch) -> Result<()> {
931        self.control.update_config(patch).await
932    }
933
934    pub async fn update_session_config(
935        &self,
936        provider: Option<ProviderHandle>,
937        model: Option<lash_core::ModelSpec>,
938        prompt: Option<PromptLayer>,
939    ) -> Result<()> {
940        self.control
941            .update_session_config(provider, model, prompt)
942            .await
943    }
944
945    pub async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
946        self.control.set_prompt_template(template).await
947    }
948
949    pub async fn clear_prompt_template(&self) -> Result<()> {
950        self.control.clear_prompt_template().await
951    }
952
953    pub async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
954        self.control.add_prompt_contribution(contribution).await
955    }
956
957    pub async fn replace_prompt_slot(
958        &self,
959        slot: PromptSlot,
960        contributions: impl IntoIterator<Item = PromptContribution>,
961    ) -> Result<()> {
962        self.control.replace_prompt_slot(slot, contributions).await
963    }
964
965    pub async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
966        self.control.clear_prompt_slot(slot).await
967    }
968}
969
970#[derive(Clone)]
971pub struct ToolAdmin {
972    control: SessionAdmin,
973}
974
975impl ToolAdmin {
976    pub(crate) fn new(control: SessionAdmin) -> Self {
977        Self { control }
978    }
979}
980
981impl ToolAdmin {
982    pub async fn state(&self) -> Result<ToolState> {
983        self.control.tool_state().await
984    }
985
986    pub fn advanced(&self) -> AdvancedToolAdmin {
987        AdvancedToolAdmin {
988            control: self.control.clone(),
989        }
990    }
991
992    pub async fn set_availability(
993        &self,
994        tool_id: impl Into<lash_core::ToolId>,
995        availability: ToolAvailability,
996    ) -> Result<u64> {
997        self.control
998            .set_tool_availability(tool_id.into(), availability)
999            .await
1000    }
1001
1002    pub async fn set_availability_many(
1003        &self,
1004        updates: &[(lash_core::ToolId, ToolAvailability)],
1005    ) -> Result<u64> {
1006        self.control.set_tool_availability_many(updates).await
1007    }
1008
1009    pub async fn clear_availability_override(
1010        &self,
1011        tool_id: impl Into<lash_core::ToolId>,
1012    ) -> Result<u64> {
1013        self.control
1014            .clear_tool_availability_override(tool_id.into())
1015            .await
1016    }
1017
1018    pub async fn active_manifests(&self) -> Result<Vec<ToolManifest>> {
1019        self.control.active_tool_manifests().await
1020    }
1021
1022    pub async fn add_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
1023        self.control.add_tool_provider(provider).await
1024    }
1025
1026    pub async fn remove_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
1027        self.control.remove_tool_source(handle).await
1028    }
1029}
1030
1031#[derive(Clone)]
1032pub struct AdvancedToolAdmin {
1033    control: SessionAdmin,
1034}
1035
1036impl AdvancedToolAdmin {
1037    /// Replace the entire tool-state snapshot.
1038    ///
1039    /// This is a generation-checked escape hatch for hosts that intentionally
1040    /// edit the full snapshot. Prefer `ToolAdmin` availability methods for
1041    /// ordinary tool policy changes.
1042    pub async fn apply_state(&self, state: ToolState) -> Result<u64> {
1043        self.control.apply_tool_state(state).await
1044    }
1045
1046    /// Restore a persisted tool-state snapshot, adopting its generation.
1047    ///
1048    /// Use this when re-applying a snapshot read from durable storage (session
1049    /// resume), not an edited delta: it reconstructs the exact persisted surface
1050    /// idempotently rather than requiring the snapshot to match the current
1051    /// generation. A cold resume of a session whose surface reached generation
1052    /// ≥ 2 needs this — [`apply_state`](Self::apply_state) would reject it.
1053    ///
1054    /// Persisted tools whose source is not currently registered (e.g. a
1055    /// detached MCP server) do not fail the restore: they are kept as orphans,
1056    /// forced `Off`, listed in the returned [`ToolRestoreReport`], and rebind
1057    /// automatically when a source re-advertises the same tool.
1058    pub async fn restore_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
1059        self.control.restore_tool_state(state).await
1060    }
1061}
1062
1063#[derive(Clone)]
1064pub struct SessionCommandAdmin {
1065    control: SessionAdmin,
1066}
1067
1068impl SessionCommandAdmin {
1069    /// Enqueue an unconditional tool-catalog refresh. The command drains
1070    /// asynchronously and recomputes the surface from live sources, so it
1071    /// takes no generation guard — any generation observed at enqueue time
1072    /// could legitimately have advanced by drain time.
1073    pub async fn refresh_tool_catalog(
1074        &self,
1075        reason: impl Into<String>,
1076        idempotency_key: impl Into<String>,
1077    ) -> Result<lash_core::SessionCommandReceipt> {
1078        self.control
1079            .submit_session_command(
1080                lash_core::SessionCommand::RefreshToolCatalog {
1081                    reason: reason.into(),
1082                },
1083                idempotency_key,
1084            )
1085            .await
1086    }
1087
1088    pub async fn reset(
1089        &self,
1090        reason: impl Into<String>,
1091        idempotency_key: impl Into<String>,
1092    ) -> Result<lash_core::SessionCommandReceipt> {
1093        self.control
1094            .submit_session_command(
1095                lash_core::SessionCommand::ResetSession {
1096                    reason: reason.into(),
1097                },
1098                idempotency_key,
1099            )
1100            .await
1101    }
1102}
1103
1104/// Session-scoped read controls for Lashlang trigger registrations.
1105#[derive(Clone)]
1106pub struct SessionTriggerAdmin {
1107    control: SessionAdmin,
1108}
1109
1110impl SessionTriggerAdmin {
1111    /// Return every trigger registration in the session.
1112    ///
1113    /// This is an admin/introspection view. Source owners should prefer
1114    /// [`Self::by_source_type`] so they only inspect registrations for the
1115    /// concrete source type they own.
1116    pub async fn list_all(&self) -> Result<Vec<lash_core::TriggerRegistration>> {
1117        self.control.list_trigger_registrations().await
1118    }
1119
1120    /// Return registrations whose source value has the given host descriptor type.
1121    ///
1122    /// This is the source-owner API: a timer, UI, webhook, or other host-owned
1123    /// source uses it to inspect registrations for keys it may schedule and emit.
1124    pub async fn by_source_type(
1125        &self,
1126        source_type: impl Into<lash_core::TriggerEventType>,
1127    ) -> Result<Vec<lash_core::TriggerRegistration>> {
1128        self.control
1129            .trigger_registrations_by_source_type(source_type)
1130            .await
1131    }
1132}
1133
1134#[derive(Clone)]
1135pub struct SessionProcessAdmin {
1136    control: SessionAdmin,
1137}
1138
1139impl SessionProcessAdmin {
1140    pub(crate) fn new(control: SessionAdmin) -> Self {
1141        Self { control }
1142    }
1143
1144    pub async fn start(
1145        &self,
1146        request: lash_core::ProcessStartRequest,
1147        scoped_effect_controller: ScopedEffectController<'_>,
1148    ) -> Result<lash_core::ProcessHandleSummary> {
1149        self.control
1150            .start_process(request, scoped_effect_controller)
1151            .await
1152    }
1153
1154    pub async fn list(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
1155        self.control.list_process_handles().await
1156    }
1157
1158    pub async fn list_all(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
1159        self.control.list_all_process_handles().await
1160    }
1161
1162    pub async fn await_all(&self) -> Result<()> {
1163        self.control.await_background_work().await
1164    }
1165
1166    pub async fn cancel(
1167        &self,
1168        process_id: &str,
1169        scoped_effect_controller: ScopedEffectController<'_>,
1170    ) -> Result<lash_core::ProcessCancelSummary> {
1171        self.control
1172            .cancel_process(process_id, scoped_effect_controller)
1173            .await
1174    }
1175
1176    pub async fn cancel_all(
1177        &self,
1178        scoped_effect_controller: ScopedEffectController<'_>,
1179    ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
1180        self.control
1181            .cancel_visible_processes(scoped_effect_controller)
1182            .await
1183    }
1184}
1185
1186#[derive(Clone)]
1187pub struct SessionStateAdmin {
1188    control: SessionAdmin,
1189}
1190
1191impl SessionStateAdmin {
1192    pub async fn export(&self) -> lash_core::SessionSnapshot {
1193        self.control.export_state().await
1194    }
1195
1196    pub async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
1197        self.control.append_messages(messages).await
1198    }
1199
1200    pub async fn append_plugin_body(
1201        &self,
1202        plugin_type: impl Into<String>,
1203        body: serde_json::Value,
1204    ) -> Result<()> {
1205        self.control.append_plugin_body(plugin_type, body).await
1206    }
1207
1208    pub async fn set_persisted(&self, state: RuntimeSessionState) -> Result<()> {
1209        self.control.set_persisted_state(state).await
1210    }
1211
1212    pub async fn branch_to_node(
1213        &self,
1214        target_leaf: Option<String>,
1215    ) -> Result<lash_core::SessionSnapshot> {
1216        self.control.branch_to_node(target_leaf).await
1217    }
1218
1219    pub async fn persist_current(&self) -> Result<RuntimeSessionState> {
1220        self.control.persist_current_state().await
1221    }
1222
1223    pub async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
1224        self.control.session_state_service().await
1225    }
1226
1227    pub async fn snapshot_execution(&self) -> Result<Option<Vec<u8>>> {
1228        self.control.snapshot_execution_state().await
1229    }
1230
1231    pub async fn restore_execution(&self, bytes: &[u8]) -> Result<()> {
1232        self.control.restore_execution_state(bytes).await
1233    }
1234
1235    pub async fn compact_context(
1236        &self,
1237        instructions: Option<String>,
1238        scoped_effect_controller: ScopedEffectController<'_>,
1239    ) -> Result<bool> {
1240        self.control
1241            .compact_context(instructions, scoped_effect_controller)
1242            .await
1243    }
1244}
1245
1246#[derive(Clone)]
1247pub struct PluginActions {
1248    pub(crate) control: SessionAdmin,
1249}
1250
1251impl PluginActions {
1252    pub async fn call<Op: lash_core::PluginAction>(&self, args: Op::Args) -> Result<Op::Output> {
1253        self.control.call_plugin_action::<Op>(args).await
1254    }
1255}
1256
1257#[derive(Clone)]
1258pub struct ChildSessionAdmin {
1259    control: SessionAdmin,
1260}
1261
1262impl ChildSessionAdmin {
1263    pub async fn create_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
1264        self.control.create_child_session(request).await
1265    }
1266
1267    pub async fn close_session(&self, session_id: &str) -> Result<()> {
1268        self.control.close_child_session(session_id).await
1269    }
1270
1271    pub async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
1272        self.control.activate_managed_session(session_id).await
1273    }
1274}
1275
1276#[derive(Clone)]
1277pub struct InjectionAdmin {
1278    control: SessionAdmin,
1279}
1280
1281impl InjectionAdmin {
1282    pub async fn inject_turn_input(
1283        &self,
1284        id: Option<String>,
1285        message: PluginMessage,
1286    ) -> Result<()> {
1287        self.control.inject_turn_input(id, message).await
1288    }
1289
1290    pub async fn inject_turn_inputs(
1291        &self,
1292        messages: Vec<lash_core::InjectedTurnInput>,
1293    ) -> Result<()> {
1294        self.control.inject_turn_inputs(messages).await
1295    }
1296}
1297
1298#[derive(Clone)]
1299pub struct ProtocolAdmin {
1300    control: SessionAdmin,
1301}
1302
1303impl ProtocolAdmin {
1304    pub async fn apply_session_extension(
1305        &self,
1306        extension: lash_core::ProtocolSessionExtensionHandle,
1307    ) -> Result<()> {
1308        self.control
1309            .apply_protocol_session_extension(extension)
1310            .await
1311    }
1312}