Skip to main content

lash/
admin.rs

1pub use crate::session::SessionConfigPatch;
2use crate::support::*;
3pub use lash_core::{AcceptedInjectedTurnInput, PluginAction};
4
5#[derive(Clone)]
6pub struct Completions {
7    pub(crate) core: LashCore,
8}
9
10impl Completions {
11    pub async fn resolve(
12        &self,
13        key: lash_core::AwaitEventKey,
14        resolution: lash_core::Resolution,
15    ) -> Result<lash_core::ResolveOutcome> {
16        self.core
17            .env
18            .core
19            .control
20            .effect_host
21            .resolve_await_event(&key, resolution)
22            .await
23            .map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))
24    }
25}
26
27#[derive(Clone)]
28pub struct CoreTriggerAdmin {
29    pub(crate) core: LashCore,
30}
31
32impl CoreTriggerAdmin {
33    pub async fn emit(
34        &self,
35        request: lash_core::TriggerOccurrenceRequest,
36        scoped_effect_controller: ScopedEffectController<'_>,
37    ) -> Result<lash_core::TriggerEmitReport> {
38        let store = self.core.env.trigger_store.as_ref().ok_or_else(|| {
39            EmbedError::Plugin(lash_core::PluginError::Session(
40                "trigger store is unavailable in this runtime".to_string(),
41            ))
42        })?;
43        let process_work_poke = self.core.process_work_runner.poke().await;
44        let router = lash_core::TriggerRouter::new(
45            Arc::clone(store),
46            Arc::clone(&self.core.env.core.durability.lashlang_artifact_store),
47            self.core.env.process_registry.clone(),
48            process_work_poke,
49        );
50        router
51            .emit(request, scoped_effect_controller.controller())
52            .await
53            .map_err(Into::into)
54    }
55
56    pub async fn subscriptions(
57        &self,
58        filter: lash_core::TriggerSubscriptionFilter,
59    ) -> Result<Vec<lash_core::TriggerRegistration>> {
60        let store = self.core.env.trigger_store.as_ref().ok_or_else(|| {
61            EmbedError::Plugin(lash_core::PluginError::Session(
62                "trigger store is unavailable in this runtime".to_string(),
63            ))
64        })?;
65        let records = store.list_subscriptions(filter).await?;
66        Ok(records
67            .iter()
68            .map(lash_core::TriggerRegistration::from)
69            .collect())
70    }
71}
72
73#[derive(Clone)]
74pub struct Processes {
75    pub(crate) core: LashCore,
76}
77
78impl Processes {
79    fn registry(&self) -> Result<Arc<dyn lash_core::ProcessRegistry>> {
80        self.core
81            .env
82            .process_registry
83            .as_ref()
84            .cloned()
85            .ok_or_else(|| {
86                EmbedError::Plugin(lash_core::PluginError::Session(
87                    "process registry is unavailable in this runtime".to_string(),
88                ))
89            })
90    }
91
92    fn make_observer(&self) -> Result<lash_core::ProcessWorkObserver> {
93        Ok(lash_core::ProcessWorkObserver::new(self.registry()?))
94    }
95
96    fn process_invocation(command: &lash_core::ProcessCommand) -> lash_core::RuntimeInvocation {
97        let effect_id = command.effect_id();
98        lash_core::RuntimeInvocation::effect(
99            lash_core::runtime::RuntimeScope::new("runtime"),
100            effect_id.clone(),
101            lash_core::RuntimeEffectKind::Process,
102            effect_id,
103        )
104    }
105
106    async fn run_command(
107        &self,
108        command: lash_core::ProcessCommand,
109        scoped_effect_controller: ScopedEffectController<'_>,
110    ) -> Result<lash_core::ProcessEffectOutcome> {
111        let registry = self.registry()?;
112        let invocation = Self::process_invocation(&command);
113        let outcome = scoped_effect_controller
114            .controller()
115            .execute_effect(
116                lash_core::RuntimeEffectEnvelope::new(
117                    invocation,
118                    lash_core::RuntimeEffectCommand::process(command),
119                ),
120                lash_core::RuntimeEffectLocalExecutor::processes(registry),
121            )
122            .await
123            .map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))?;
124        match outcome {
125            lash_core::RuntimeEffectOutcome::Process { result } => Ok(result),
126            _ => Err(EmbedError::Plugin(lash_core::PluginError::Session(
127                "process effect returned non-process outcome".to_string(),
128            ))),
129        }
130    }
131
132    pub async fn start(
133        &self,
134        request: lash_core::ProcessStartRequest,
135        scoped_effect_controller: ScopedEffectController<'_>,
136    ) -> Result<lash_core::ProcessRecord> {
137        let env_ref = match request.env_spec.as_ref() {
138            Some(env_spec) => Some(
139                lash_core::runtime::persist_process_execution_env(
140                    self.core
141                        .env
142                        .core
143                        .durability
144                        .lashlang_artifact_store
145                        .as_ref(),
146                    env_spec,
147                )
148                .await?,
149            ),
150            None => None,
151        };
152        let grant = request.grant.clone();
153        let registration = request.into_registration(env_ref);
154        let command = lash_core::ProcessCommand::Start {
155            registration,
156            grant,
157            execution_context: Box::new(lash_core::ProcessExecutionContext::default()),
158        };
159        let outcome = self
160            .run_command(command, scoped_effect_controller.clone())
161            .await?;
162        let lash_core::ProcessEffectOutcome::Start { record } = outcome else {
163            return Err(EmbedError::Plugin(lash_core::PluginError::Session(
164                "process start returned the wrong outcome".to_string(),
165            )));
166        };
167        if let Some(poke) = self.core.process_work_runner.poke().await {
168            poke.poke();
169        }
170        Ok(record)
171    }
172
173    pub async fn list(
174        &self,
175        filter: &lash_core::ProcessListFilter,
176    ) -> Result<Vec<lash_core::ObservedProcess>> {
177        self.make_observer()?.list(filter).await.map_err(Into::into)
178    }
179
180    pub async fn get(&self, process_id: &str) -> Result<Option<lash_core::ObservedProcess>> {
181        Ok(self.make_observer()?.process(process_id).await)
182    }
183
184    pub async fn events(
185        &self,
186        process_id: &str,
187        after_sequence: u64,
188    ) -> Result<Vec<lash_core::ObservedProcessEvent>> {
189        self.make_observer()?
190            .events_after(process_id, after_sequence)
191            .await
192            .map_err(Into::into)
193    }
194
195    pub async fn await_output(&self, process_id: &str) -> Result<lash_core::ProcessAwaitOutput> {
196        self.registry()?
197            .await_process(process_id)
198            .await
199            .map_err(Into::into)
200    }
201
202    pub async fn cancel(
203        &self,
204        process_id: &str,
205        scoped_effect_controller: ScopedEffectController<'_>,
206    ) -> Result<lash_core::ProcessCancelSummary> {
207        let command = lash_core::ProcessCommand::Cancel {
208            process_id: process_id.to_string(),
209            reason: Some("requested by host".to_string()),
210        };
211        let outcome = self
212            .run_command(command, scoped_effect_controller.clone())
213            .await?;
214        let lash_core::ProcessEffectOutcome::Cancel { record } = outcome else {
215            return Err(EmbedError::Plugin(lash_core::PluginError::Session(
216                "process cancel returned the wrong outcome".to_string(),
217            )));
218        };
219        Ok(lash_core::ProcessCancelSummary::from_record(record))
220    }
221
222    pub async fn signal(
223        &self,
224        process_id: &str,
225        signal_name: impl Into<String>,
226        signal_id: impl Into<String>,
227        request: lash_core::ProcessEventAppendRequest,
228        scoped_effect_controller: ScopedEffectController<'_>,
229    ) -> Result<lash_core::ProcessEvent> {
230        let signal_name = signal_name.into();
231        let event_type = request.event_type.clone();
232        let payload = request.payload.clone();
233        let command = lash_core::ProcessCommand::Signal {
234            process_id: process_id.to_string(),
235            signal_name: signal_name.clone(),
236            signal_id: signal_id.into(),
237            request,
238        };
239        let outcome = self
240            .run_command(command, scoped_effect_controller.clone())
241            .await?;
242        let lash_core::ProcessEffectOutcome::Signal { event } = outcome else {
243            return Err(EmbedError::Plugin(lash_core::PluginError::Session(
244                "process signal returned the wrong outcome".to_string(),
245            )));
246        };
247        let registry = self.registry()?;
248        let waiting_ordinal =
249            registry
250                .get_process(process_id)
251                .await
252                .and_then(|record| match record.wait {
253                    Some(lash_core::WaitState {
254                        kind:
255                            lash_core::WaitKind::Signal {
256                                name,
257                                event_type: wait_event_type,
258                                ordinal,
259                                ..
260                            },
261                        ..
262                    }) if name == signal_name && wait_event_type == event_type => Some(ordinal),
263                    _ => None,
264                });
265        let ordinal = match waiting_ordinal {
266            Some(ordinal) => ordinal,
267            None => {
268                registry
269                    .count_events_through(process_id, &event_type, event.sequence)
270                    .await?
271            }
272        };
273        if ordinal > 0 {
274            let key = scoped_effect_controller
275                .controller()
276                .await_event_key(
277                    &lash_core::ExecutionScope::process(process_id),
278                    lash_core::AwaitEventWaitIdentity::process_signal(
279                        process_id,
280                        &signal_name,
281                        ordinal,
282                    ),
283                )
284                .await
285                .map_err(|err| {
286                    EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
287                })?;
288            let _ = scoped_effect_controller
289                .controller()
290                .resolve_await_event(&key, lash_core::Resolution::Ok(payload))
291                .await
292                .map_err(|err| {
293                    EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
294                })?;
295        }
296        Ok(event)
297    }
298
299    pub async fn session_snapshot(
300        &self,
301        session_id: impl Into<String>,
302    ) -> Result<lash_core::ProcessWorkSnapshot> {
303        self.make_observer()?
304            .snapshot_for_session(session_id)
305            .await
306            .map_err(Into::into)
307    }
308
309    pub fn observer(&self) -> Result<lash_core::ProcessWorkObserver> {
310        self.make_observer()
311    }
312}
313
314#[derive(Clone)]
315pub struct SessionAdmin {
316    pub(crate) runtime: RuntimeHandle,
317}
318
319impl SessionAdmin {
320    pub fn config(&self) -> SessionConfigAdmin {
321        SessionConfigAdmin {
322            control: self.clone(),
323        }
324    }
325
326    pub fn tools(&self) -> ToolAdmin {
327        ToolAdmin {
328            control: self.clone(),
329        }
330    }
331
332    pub fn commands(&self) -> SessionCommandAdmin {
333        SessionCommandAdmin {
334            control: self.clone(),
335        }
336    }
337
338    pub fn triggers(&self) -> SessionTriggerAdmin {
339        SessionTriggerAdmin {
340            control: self.clone(),
341        }
342    }
343
344    pub fn state(&self) -> SessionStateAdmin {
345        SessionStateAdmin {
346            control: self.clone(),
347        }
348    }
349
350    pub fn children(&self) -> ChildSessionAdmin {
351        ChildSessionAdmin {
352            control: self.clone(),
353        }
354    }
355
356    pub fn injection(&self) -> InjectionAdmin {
357        InjectionAdmin {
358            control: self.clone(),
359        }
360    }
361
362    pub fn mode(&self) -> ModeAdmin {
363        ModeAdmin {
364            control: self.clone(),
365        }
366    }
367
368    pub fn processes(&self) -> SessionProcessAdmin {
369        SessionProcessAdmin {
370            control: self.clone(),
371        }
372    }
373
374    /// Run `f` against the locked runtime writer, then publish the resulting
375    /// observation. The body is the canonical `lock → call → publish_from`
376    /// stamp shared by nearly every mutating control method; publish happens
377    /// unconditionally once the closure returns.
378    async fn with_writer<F, T>(&self, f: F) -> T
379    where
380        F: AsyncFnOnce(&mut LashRuntime) -> T,
381    {
382        let writer = self.runtime.writer();
383        let mut runtime = writer.lock().await;
384        let value = f(&mut runtime).await;
385        self.runtime.publish_from(&runtime);
386        value
387    }
388
389    async fn update_config(&self, patch: SessionConfigPatch) -> Result<()> {
390        self.update_session_config(patch.provider, patch.model, patch.prompt)
391            .await?;
392        Ok(())
393    }
394
395    async fn update_session_config(
396        &self,
397        provider: Option<ProviderHandle>,
398        model: Option<lash_core::ModelSpec>,
399        prompt: Option<PromptLayer>,
400    ) -> Result<()> {
401        self.with_writer(async |runtime: &mut LashRuntime| {
402            runtime.update_session_config(provider, model, prompt).await;
403        })
404        .await;
405        Ok(())
406    }
407
408    async fn export_state(&self) -> lash_core::SessionSnapshot {
409        self.runtime.observe().read_view.to_snapshot()
410    }
411
412    async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
413        self.with_writer(async |runtime: &mut LashRuntime| {
414            runtime
415                .append_session_nodes(lash_core::AppendSessionNodesRequest {
416                    nodes: messages
417                        .into_iter()
418                        .map(lash_core::SessionAppendNode::message)
419                        .collect(),
420                    requires_ancestor_node_id: None,
421                })
422                .await
423                .map(|_| ())
424                .map_err(Into::into)
425        })
426        .await
427    }
428
429    async fn append_plugin_body(
430        &self,
431        plugin_type: impl Into<String>,
432        body: serde_json::Value,
433    ) -> Result<()> {
434        self.with_writer(async |runtime: &mut LashRuntime| {
435            runtime
436                .append_session_nodes(lash_core::AppendSessionNodesRequest {
437                    nodes: vec![lash_core::SessionAppendNode::plugin(plugin_type, body)],
438                    requires_ancestor_node_id: None,
439                })
440                .await
441                .map(|_| ())
442                .map_err(Into::into)
443        })
444        .await
445    }
446
447    async fn set_persisted_state(&self, state: RuntimeSessionState) -> Result<()> {
448        self.with_writer(async |runtime: &mut LashRuntime| {
449            runtime.set_persisted_state(state).map_err(Into::into)
450        })
451        .await
452    }
453
454    async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
455        self.with_writer(async |runtime: &mut LashRuntime| {
456            runtime.set_prompt_template(template).await;
457        })
458        .await;
459        Ok(())
460    }
461
462    async fn clear_prompt_template(&self) -> Result<()> {
463        self.with_writer(async |runtime: &mut LashRuntime| {
464            runtime.clear_prompt_template().await;
465        })
466        .await;
467        Ok(())
468    }
469
470    async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
471        self.with_writer(async |runtime: &mut LashRuntime| {
472            runtime.add_prompt_contribution(contribution).await;
473        })
474        .await;
475        Ok(())
476    }
477
478    async fn replace_prompt_slot(
479        &self,
480        slot: PromptSlot,
481        contributions: impl IntoIterator<Item = PromptContribution>,
482    ) -> Result<()> {
483        self.with_writer(async |runtime: &mut LashRuntime| {
484            runtime.replace_prompt_slot(slot, contributions).await;
485        })
486        .await;
487        Ok(())
488    }
489
490    async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
491        self.with_writer(async |runtime: &mut LashRuntime| {
492            runtime.clear_prompt_slot(slot).await;
493        })
494        .await;
495        Ok(())
496    }
497
498    async fn apply_protocol_session_extension(
499        &self,
500        extension: lash_core::ProtocolSessionExtensionHandle,
501    ) -> Result<()> {
502        self.with_writer(async |runtime: &mut LashRuntime| {
503            runtime
504                .apply_protocol_session_extension(extension)
505                .await
506                .map_err(Into::into)
507        })
508        .await
509    }
510
511    async fn branch_to_node(
512        &self,
513        target_leaf: Option<String>,
514    ) -> Result<lash_core::SessionSnapshot> {
515        self.with_writer(async |runtime: &mut LashRuntime| {
516            runtime
517                .branch_to_node(target_leaf)
518                .await
519                .map_err(Into::into)
520        })
521        .await
522    }
523
524    async fn await_background_work(&self) -> Result<()> {
525        self.with_writer(async |runtime: &mut LashRuntime| {
526            runtime.await_background_work().await.map_err(Into::into)
527        })
528        .await
529    }
530
531    async fn refresh_tool_catalog(&self) -> Result<()> {
532        self.with_writer(async |runtime: &mut LashRuntime| {
533            runtime
534                .refresh_session_tool_catalog()
535                .await
536                .map_err(Into::into)
537        })
538        .await
539    }
540
541    async fn submit_session_command(
542        &self,
543        command: lash_core::SessionCommand,
544        idempotency_key: impl Into<String>,
545    ) -> Result<lash_core::SessionCommandReceipt> {
546        let idempotency_key = idempotency_key.into();
547        self.with_writer(async |runtime: &mut LashRuntime| {
548            runtime
549                .submit_session_command(command, idempotency_key)
550                .await
551                .map_err(Into::into)
552        })
553        .await
554    }
555
556    async fn list_lashlang_trigger_registrations(
557        &self,
558    ) -> Result<Vec<lash_core::TriggerRegistration>> {
559        self.with_writer(async |runtime: &mut LashRuntime| {
560            runtime
561                .list_lashlang_trigger_registrations()
562                .await
563                .map_err(Into::into)
564        })
565        .await
566    }
567
568    async fn lashlang_trigger_registrations_by_source_type(
569        &self,
570        source_type: impl Into<lash_core::TriggerEventType>,
571    ) -> Result<Vec<lash_core::TriggerRegistration>> {
572        self.with_writer(async |runtime: &mut LashRuntime| {
573            runtime
574                .lashlang_trigger_registrations_by_source_type(source_type)
575                .await
576                .map_err(Into::into)
577        })
578        .await
579    }
580
581    async fn invoke_plugin_action(
582        &self,
583        name: &str,
584        args: serde_json::Value,
585    ) -> Result<ToolResult> {
586        let session_id = self.runtime.observe().session_id().to_string();
587        let writer = self.runtime.writer();
588        writer
589            .lock()
590            .await
591            .invoke_plugin_action(name, args, Some(session_id))
592            .await
593            .map_err(Into::into)
594    }
595
596    async fn call_plugin_action<Op: lash_core::PluginAction>(
597        &self,
598        args: Op::Args,
599    ) -> Result<Op::Output> {
600        let result = self
601            .invoke_plugin_action(
602                Op::NAME,
603                serde_json::to_value(args).map_err(|err| {
604                    EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
605                        "invalid {} args: {err}",
606                        Op::NAME
607                    )))
608                })?,
609            )
610            .await?;
611        let Some(output) = result.as_done_output() else {
612            return Err(EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
613                "{} returned a pending result where completed output is required",
614                Op::NAME
615            ))));
616        };
617        if !output.is_success() {
618            return Err(EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
619                "{} failed: {}",
620                Op::NAME,
621                output.value_for_projection()
622            ))));
623        }
624        serde_json::from_value(output.value_for_projection()).map_err(|err| {
625            EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
626                "invalid {} output: {err}",
627                Op::NAME
628            )))
629        })
630    }
631
632    async fn compact_context(
633        &self,
634        instructions: Option<String>,
635        scoped_effect_controller: ScopedEffectController<'_>,
636    ) -> Result<bool> {
637        self.with_writer(async |runtime: &mut LashRuntime| {
638            runtime
639                .compact_context(instructions, scoped_effect_controller)
640                .await
641                .map_err(Into::into)
642        })
643        .await
644    }
645
646    async fn persist_current_state(&self) -> Result<RuntimeSessionState> {
647        self.with_writer(async |runtime: &mut LashRuntime| {
648            runtime.await_background_work().await?;
649            Ok(runtime.export_persisted_state())
650        })
651        .await
652    }
653
654    async fn list_process_handles(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
655        Ok(self.runtime.observe().list_process_handles().await)
656    }
657
658    async fn list_all_process_handles(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
659        Ok(self.runtime.observe().list_all_process_handles().await)
660    }
661
662    async fn start_process(
663        &self,
664        request: lash_core::ProcessStartRequest,
665        scoped_effect_controller: ScopedEffectController<'_>,
666    ) -> Result<lash_core::ProcessHandleSummary> {
667        let writer = self.runtime.writer();
668        let runtime = writer.lock().await;
669        let session_id = runtime.session_id().to_string();
670        let processes = runtime.process_service()?;
671        let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
672        let summary = processes
673            .start_from_request(&session_id, request, scope)
674            .await
675            .map_err(EmbedError::Plugin)?;
676        self.runtime.record_process_changed(
677            SessionProcessEventKind::Started,
678            vec![summary.process_id.clone()],
679        );
680        Ok(summary)
681    }
682
683    async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
684        self.runtime
685            .writer()
686            .lock()
687            .await
688            .session_state_service()
689            .map_err(Into::into)
690    }
691
692    async fn cancel_process(
693        &self,
694        process_id: &str,
695        scoped_effect_controller: ScopedEffectController<'_>,
696    ) -> Result<lash_core::ProcessCancelSummary> {
697        let writer = self.runtime.writer();
698        let runtime = writer.lock().await;
699        let session_id = runtime.session_id().to_string();
700        let processes = runtime.process_service()?;
701        let cancel_ability = runtime.process_cancel_ability();
702        let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
703        let summary = cancel_ability
704            .cancel_summary(
705                processes.as_ref(),
706                lash_core::ProcessCancelRequest::new(
707                    &session_id,
708                    process_id,
709                    scope,
710                    lash_core::ProcessCancelSource::HostApi,
711                )
712                .with_reason("requested by host API"),
713            )
714            .await
715            .map_err(EmbedError::Plugin)?;
716        self.runtime.record_process_changed(
717            SessionProcessEventKind::Cancelled,
718            vec![summary.process_id.clone()],
719        );
720        Ok(summary)
721    }
722
723    async fn cancel_visible_processes(
724        &self,
725        scoped_effect_controller: ScopedEffectController<'_>,
726    ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
727        let writer = self.runtime.writer();
728        let runtime = writer.lock().await;
729        let session_id = runtime.session_id().to_string();
730        let processes = runtime.process_service()?;
731        let cancel_ability = runtime.process_cancel_ability();
732        let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
733        let summaries = cancel_ability
734            .cancel_all_visible(
735                processes.as_ref(),
736                lash_core::ProcessCancelAllRequest::new(
737                    &session_id,
738                    scope,
739                    lash_core::ProcessCancelSource::HostApi,
740                )
741                .with_reason("requested by host API"),
742            )
743            .await
744            .map_err(EmbedError::Plugin)?;
745        self.runtime.record_process_changed(
746            SessionProcessEventKind::Cancelled,
747            summaries
748                .iter()
749                .map(|summary| summary.process_id.clone())
750                .collect(),
751        );
752        Ok(summaries)
753    }
754
755    async fn snapshot_execution_state(&self) -> Result<Option<Vec<u8>>> {
756        self.with_writer(async |runtime: &mut LashRuntime| {
757            runtime.snapshot_execution_state().await.map_err(Into::into)
758        })
759        .await
760    }
761
762    async fn restore_execution_state(&self, bytes: &[u8]) -> Result<()> {
763        self.with_writer(async |runtime: &mut LashRuntime| {
764            runtime
765                .restore_execution_state(bytes)
766                .await
767                .map_err(Into::into)
768        })
769        .await
770    }
771
772    async fn tool_state(&self) -> Result<ToolState> {
773        self.runtime.observe().tool_state.clone().ok_or_else(|| {
774            EmbedError::Session(SessionError::Protocol(
775                "runtime session not available".to_string(),
776            ))
777        })
778    }
779
780    async fn apply_tool_state(&self, state: ToolState) -> Result<u64> {
781        self.with_writer(async |runtime: &mut LashRuntime| {
782            runtime
783                .apply_tool_state(state)
784                .await
785                .map_err(EmbedError::from)
786        })
787        .await
788    }
789
790    async fn restore_tool_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
791        self.with_writer(async |runtime: &mut LashRuntime| {
792            runtime
793                .restore_tool_state(state)
794                .await
795                .map_err(EmbedError::from)
796        })
797        .await
798    }
799
800    async fn set_tool_availability(
801        &self,
802        name: &str,
803        availability: ToolAvailability,
804    ) -> Result<u64> {
805        self.set_tool_availability_many(&[(name, availability)])
806            .await
807    }
808
809    async fn set_tool_availability_many<N: AsRef<str>>(
810        &self,
811        updates: &[(N, ToolAvailability)],
812    ) -> Result<u64> {
813        let mut state = self.tool_state().await?;
814        for (name, availability) in updates {
815            state
816                .set_availability(name.as_ref(), Some(*availability))
817                .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
818        }
819        self.apply_tool_state(state).await
820    }
821
822    async fn clear_tool_availability_override(&self, name: &str) -> Result<u64> {
823        let mut state = self.tool_state().await?;
824        state
825            .set_availability(name, None)
826            .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
827        self.apply_tool_state(state).await
828    }
829
830    async fn active_tool_manifests(&self) -> Result<Vec<ToolManifest>> {
831        Ok(self.tool_state().await?.tool_manifests())
832    }
833
834    async fn add_tool_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
835        let tool_registry = self.tool_registry().await?;
836        let handle = tool_registry
837            .add_tool_provider(provider)
838            .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
839        self.refresh_tool_catalog().await?;
840        Ok(handle)
841    }
842
843    async fn remove_tool_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
844        let tool_registry = self.tool_registry().await?;
845        let generation = tool_registry
846            .remove_source(handle)
847            .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
848        self.refresh_tool_catalog().await?;
849        Ok(generation)
850    }
851
852    async fn create_child_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
853        let writer = self.runtime.writer();
854        let runtime = writer.lock().await;
855        let lifecycle = runtime.session_lifecycle_service()?;
856        lifecycle.create_session(request).await.map_err(Into::into)
857    }
858
859    async fn close_child_session(&self, session_id: &str) -> Result<()> {
860        let writer = self.runtime.writer();
861        let runtime = writer.lock().await;
862        let lifecycle = runtime.session_lifecycle_service()?;
863        lifecycle
864            .close_session(session_id)
865            .await
866            .map_err(Into::into)
867    }
868
869    async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
870        self.with_writer(async |runtime: &mut LashRuntime| {
871            runtime
872                .activate_managed_session(session_id)
873                .await
874                .map_err(Into::into)
875        })
876        .await
877    }
878
879    async fn inject_turn_input(&self, id: Option<String>, message: PluginMessage) -> Result<()> {
880        self.inject_turn_inputs(vec![lash_core::InjectedTurnInput { id, message }])
881            .await
882    }
883
884    async fn inject_turn_inputs(&self, messages: Vec<lash_core::InjectedTurnInput>) -> Result<()> {
885        for input in messages {
886            let source_key = input.id.map(|id| format!("injection:{id}"));
887            let turn_input = turn_input_from_plugin_message(input.message);
888            self.runtime
889                .enqueue_turn_input(
890                    turn_input,
891                    lash_core::DeliveryPolicy::EarliestSafeBoundary,
892                    lash_core::SlotPolicy::Join,
893                    source_key,
894                )
895                .await
896                .map(|_| ())
897                .map_err(EmbedError::Runtime)?;
898        }
899        Ok(())
900    }
901
902    async fn tool_registry(&self) -> Result<Arc<lash_core::ToolRegistry>> {
903        self.runtime
904            .writer()
905            .lock()
906            .await
907            .plugin_session()
908            .map(|session| session.tool_registry())
909            .ok_or_else(|| {
910                EmbedError::Session(SessionError::Protocol(
911                    "tool registry is unavailable in this runtime session".to_string(),
912                ))
913            })
914    }
915}
916
917fn turn_input_from_plugin_message(message: PluginMessage) -> TurnInput {
918    let mut input = TurnInput::empty();
919    if !message.content.is_empty() {
920        input.items.push(InputItem::Text {
921            text: message.content,
922        });
923    }
924    for (index, bytes) in message.images.into_iter().enumerate() {
925        let id = format!("injected-image-{index}");
926        input.items.push(InputItem::ImageRef { id: id.clone() });
927        input.image_blobs.insert(id, bytes);
928    }
929    input
930}
931
932#[derive(Clone)]
933pub struct SessionConfigAdmin {
934    control: SessionAdmin,
935}
936
937impl SessionConfigAdmin {
938    pub async fn update(&self, patch: SessionConfigPatch) -> Result<()> {
939        self.control.update_config(patch).await
940    }
941
942    pub async fn update_session_config(
943        &self,
944        provider: Option<ProviderHandle>,
945        model: Option<lash_core::ModelSpec>,
946        prompt: Option<PromptLayer>,
947    ) -> Result<()> {
948        self.control
949            .update_session_config(provider, model, prompt)
950            .await
951    }
952
953    pub async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
954        self.control.set_prompt_template(template).await
955    }
956
957    pub async fn clear_prompt_template(&self) -> Result<()> {
958        self.control.clear_prompt_template().await
959    }
960
961    pub async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
962        self.control.add_prompt_contribution(contribution).await
963    }
964
965    pub async fn replace_prompt_slot(
966        &self,
967        slot: PromptSlot,
968        contributions: impl IntoIterator<Item = PromptContribution>,
969    ) -> Result<()> {
970        self.control.replace_prompt_slot(slot, contributions).await
971    }
972
973    pub async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
974        self.control.clear_prompt_slot(slot).await
975    }
976}
977
978#[derive(Clone)]
979pub struct ToolAdmin {
980    control: SessionAdmin,
981}
982
983impl ToolAdmin {
984    pub(crate) fn new(control: SessionAdmin) -> Self {
985        Self { control }
986    }
987}
988
989impl ToolAdmin {
990    pub async fn state(&self) -> Result<ToolState> {
991        self.control.tool_state().await
992    }
993
994    pub fn advanced(&self) -> AdvancedToolAdmin {
995        AdvancedToolAdmin {
996            control: self.control.clone(),
997        }
998    }
999
1000    pub async fn set_availability(
1001        &self,
1002        name: impl AsRef<str>,
1003        availability: ToolAvailability,
1004    ) -> Result<u64> {
1005        self.control
1006            .set_tool_availability(name.as_ref(), availability)
1007            .await
1008    }
1009
1010    pub async fn set_availability_many<N: AsRef<str>>(
1011        &self,
1012        updates: &[(N, ToolAvailability)],
1013    ) -> Result<u64> {
1014        self.control.set_tool_availability_many(updates).await
1015    }
1016
1017    pub async fn clear_availability_override(&self, name: impl AsRef<str>) -> Result<u64> {
1018        self.control
1019            .clear_tool_availability_override(name.as_ref())
1020            .await
1021    }
1022
1023    pub async fn active_manifests(&self) -> Result<Vec<ToolManifest>> {
1024        self.control.active_tool_manifests().await
1025    }
1026
1027    pub async fn add_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
1028        self.control.add_tool_provider(provider).await
1029    }
1030
1031    pub async fn remove_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
1032        self.control.remove_tool_source(handle).await
1033    }
1034}
1035
1036#[derive(Clone)]
1037pub struct AdvancedToolAdmin {
1038    control: SessionAdmin,
1039}
1040
1041impl AdvancedToolAdmin {
1042    /// Replace the entire tool-state snapshot.
1043    ///
1044    /// This is a generation-checked escape hatch for hosts that intentionally
1045    /// edit the full snapshot. Prefer `ToolAdmin` availability methods for
1046    /// ordinary tool policy changes.
1047    pub async fn apply_state(&self, state: ToolState) -> Result<u64> {
1048        self.control.apply_tool_state(state).await
1049    }
1050
1051    /// Restore a persisted tool-state snapshot, adopting its generation.
1052    ///
1053    /// Use this when re-applying a snapshot read from durable storage (session
1054    /// resume), not an edited delta: it reconstructs the exact persisted surface
1055    /// idempotently rather than requiring the snapshot to match the current
1056    /// generation. A cold resume of a session whose surface reached generation
1057    /// ≥ 2 needs this — [`apply_state`](Self::apply_state) would reject it.
1058    ///
1059    /// Persisted tools whose source is not currently registered (e.g. a
1060    /// detached MCP server) do not fail the restore: they are kept as orphans,
1061    /// forced `Off`, listed in the returned [`ToolRestoreReport`], and rebind
1062    /// automatically when a source re-advertises the same tool.
1063    pub async fn restore_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
1064        self.control.restore_tool_state(state).await
1065    }
1066}
1067
1068#[derive(Clone)]
1069pub struct SessionCommandAdmin {
1070    control: SessionAdmin,
1071}
1072
1073impl SessionCommandAdmin {
1074    /// Enqueue an unconditional tool-catalog refresh. The command drains
1075    /// asynchronously and recomputes the surface from live sources, so it
1076    /// takes no generation guard — any generation observed at enqueue time
1077    /// could legitimately have advanced by drain time.
1078    pub async fn refresh_tool_catalog(
1079        &self,
1080        reason: impl Into<String>,
1081        idempotency_key: impl Into<String>,
1082    ) -> Result<lash_core::SessionCommandReceipt> {
1083        self.control
1084            .submit_session_command(
1085                lash_core::SessionCommand::RefreshToolCatalog {
1086                    reason: reason.into(),
1087                },
1088                idempotency_key,
1089            )
1090            .await
1091    }
1092
1093    pub async fn reset(
1094        &self,
1095        reason: impl Into<String>,
1096        idempotency_key: impl Into<String>,
1097    ) -> Result<lash_core::SessionCommandReceipt> {
1098        self.control
1099            .submit_session_command(
1100                lash_core::SessionCommand::ResetSession {
1101                    reason: reason.into(),
1102                },
1103                idempotency_key,
1104            )
1105            .await
1106    }
1107}
1108
1109/// Session-scoped read controls for Lashlang trigger registrations.
1110#[derive(Clone)]
1111pub struct SessionTriggerAdmin {
1112    control: SessionAdmin,
1113}
1114
1115impl SessionTriggerAdmin {
1116    /// Return every trigger registration in the session.
1117    ///
1118    /// This is an admin/introspection view. Source owners should prefer
1119    /// [`Self::by_source_type`] so they only inspect registrations for the
1120    /// concrete source type they own.
1121    pub async fn list_all(&self) -> Result<Vec<lash_core::TriggerRegistration>> {
1122        self.control.list_lashlang_trigger_registrations().await
1123    }
1124
1125    /// Return registrations whose source value has the given host descriptor type.
1126    ///
1127    /// This is the source-owner API: a timer, UI, webhook, or other host-owned
1128    /// source uses it to inspect registrations for keys it may schedule and emit.
1129    pub async fn by_source_type(
1130        &self,
1131        source_type: impl Into<lash_core::TriggerEventType>,
1132    ) -> Result<Vec<lash_core::TriggerRegistration>> {
1133        self.control
1134            .lashlang_trigger_registrations_by_source_type(source_type)
1135            .await
1136    }
1137}
1138
1139#[derive(Clone)]
1140pub struct SessionProcessAdmin {
1141    control: SessionAdmin,
1142}
1143
1144impl SessionProcessAdmin {
1145    pub(crate) fn new(control: SessionAdmin) -> Self {
1146        Self { control }
1147    }
1148
1149    pub async fn start(
1150        &self,
1151        request: lash_core::ProcessStartRequest,
1152        scoped_effect_controller: ScopedEffectController<'_>,
1153    ) -> Result<lash_core::ProcessHandleSummary> {
1154        self.control
1155            .start_process(request, scoped_effect_controller)
1156            .await
1157    }
1158
1159    pub async fn list(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
1160        self.control.list_process_handles().await
1161    }
1162
1163    pub async fn list_all(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
1164        self.control.list_all_process_handles().await
1165    }
1166
1167    pub async fn await_all(&self) -> Result<()> {
1168        self.control.await_background_work().await
1169    }
1170
1171    pub async fn cancel(
1172        &self,
1173        process_id: &str,
1174        scoped_effect_controller: ScopedEffectController<'_>,
1175    ) -> Result<lash_core::ProcessCancelSummary> {
1176        self.control
1177            .cancel_process(process_id, scoped_effect_controller)
1178            .await
1179    }
1180
1181    pub async fn cancel_all(
1182        &self,
1183        scoped_effect_controller: ScopedEffectController<'_>,
1184    ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
1185        self.control
1186            .cancel_visible_processes(scoped_effect_controller)
1187            .await
1188    }
1189}
1190
1191#[derive(Clone)]
1192pub struct SessionStateAdmin {
1193    control: SessionAdmin,
1194}
1195
1196impl SessionStateAdmin {
1197    pub async fn export(&self) -> lash_core::SessionSnapshot {
1198        self.control.export_state().await
1199    }
1200
1201    pub async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
1202        self.control.append_messages(messages).await
1203    }
1204
1205    pub async fn append_plugin_body(
1206        &self,
1207        plugin_type: impl Into<String>,
1208        body: serde_json::Value,
1209    ) -> Result<()> {
1210        self.control.append_plugin_body(plugin_type, body).await
1211    }
1212
1213    pub async fn set_persisted(&self, state: RuntimeSessionState) -> Result<()> {
1214        self.control.set_persisted_state(state).await
1215    }
1216
1217    pub async fn branch_to_node(
1218        &self,
1219        target_leaf: Option<String>,
1220    ) -> Result<lash_core::SessionSnapshot> {
1221        self.control.branch_to_node(target_leaf).await
1222    }
1223
1224    pub async fn persist_current(&self) -> Result<RuntimeSessionState> {
1225        self.control.persist_current_state().await
1226    }
1227
1228    pub async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
1229        self.control.session_state_service().await
1230    }
1231
1232    pub async fn snapshot_execution(&self) -> Result<Option<Vec<u8>>> {
1233        self.control.snapshot_execution_state().await
1234    }
1235
1236    pub async fn restore_execution(&self, bytes: &[u8]) -> Result<()> {
1237        self.control.restore_execution_state(bytes).await
1238    }
1239
1240    pub async fn compact_context(
1241        &self,
1242        instructions: Option<String>,
1243        scoped_effect_controller: ScopedEffectController<'_>,
1244    ) -> Result<bool> {
1245        self.control
1246            .compact_context(instructions, scoped_effect_controller)
1247            .await
1248    }
1249}
1250
1251#[derive(Clone)]
1252pub struct PluginActions {
1253    pub(crate) control: SessionAdmin,
1254}
1255
1256impl PluginActions {
1257    pub async fn call<Op: lash_core::PluginAction>(&self, args: Op::Args) -> Result<Op::Output> {
1258        self.control.call_plugin_action::<Op>(args).await
1259    }
1260}
1261
1262#[derive(Clone)]
1263pub struct ChildSessionAdmin {
1264    control: SessionAdmin,
1265}
1266
1267impl ChildSessionAdmin {
1268    pub async fn create_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
1269        self.control.create_child_session(request).await
1270    }
1271
1272    pub async fn close_session(&self, session_id: &str) -> Result<()> {
1273        self.control.close_child_session(session_id).await
1274    }
1275
1276    pub async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
1277        self.control.activate_managed_session(session_id).await
1278    }
1279}
1280
1281#[derive(Clone)]
1282pub struct InjectionAdmin {
1283    control: SessionAdmin,
1284}
1285
1286impl InjectionAdmin {
1287    pub async fn inject_turn_input(
1288        &self,
1289        id: Option<String>,
1290        message: PluginMessage,
1291    ) -> Result<()> {
1292        self.control.inject_turn_input(id, message).await
1293    }
1294
1295    pub async fn inject_turn_inputs(
1296        &self,
1297        messages: Vec<lash_core::InjectedTurnInput>,
1298    ) -> Result<()> {
1299        self.control.inject_turn_inputs(messages).await
1300    }
1301}
1302
1303#[derive(Clone)]
1304pub struct ModeAdmin {
1305    control: SessionAdmin,
1306}
1307
1308impl ModeAdmin {
1309    pub async fn apply_session_extension(
1310        &self,
1311        extension: lash_core::ProtocolSessionExtensionHandle,
1312    ) -> Result<()> {
1313        self.control
1314            .apply_protocol_session_extension(extension)
1315            .await
1316    }
1317}