Skip to main content

lash/
admin.rs

1pub use crate::session::SessionConfigPatch;
2use crate::support::*;
3pub use lash_core::{AcceptedInjectedTurnInput, PluginAction};
4
5#[derive(Clone)]
6pub struct Completions {
7    pub(crate) core: LashCore,
8}
9
10impl Completions {
11    pub async fn resolve(
12        &self,
13        key: lash_core::AwaitEventKey,
14        resolution: lash_core::Resolution,
15    ) -> Result<lash_core::ResolveOutcome> {
16        self.core
17            .env
18            .core
19            .control
20            .effect_host
21            .resolve_await_event(&key, resolution)
22            .await
23            .map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))
24    }
25}
26
27#[derive(Clone)]
28pub struct CoreTriggerAdmin {
29    pub(crate) core: LashCore,
30}
31
32impl CoreTriggerAdmin {
33    pub async fn emit(
34        &self,
35        request: lash_core::TriggerOccurrenceRequest,
36        scoped_effect_controller: ScopedEffectController<'_>,
37    ) -> Result<lash_core::TriggerEmitReport> {
38        let store = self.core.env.trigger_store.as_ref().ok_or_else(|| {
39            EmbedError::Plugin(lash_core::PluginError::Session(
40                "trigger store is unavailable in this runtime".to_string(),
41            ))
42        })?;
43        let process_work_poke = self.core.process_work_runner.poke().await;
44        let router = lash_core::TriggerRouter::new(
45            Arc::clone(store),
46            Arc::clone(&self.core.env.core.durability.lashlang_artifact_store),
47            self.core.env.process_registry.clone(),
48            process_work_poke,
49            self.core.env.core.profile.host_profile_id.clone(),
50        );
51        router
52            .emit(request, scoped_effect_controller.controller())
53            .await
54            .map_err(Into::into)
55    }
56
57    pub async fn subscriptions(
58        &self,
59        filter: lash_core::TriggerSubscriptionFilter,
60    ) -> Result<Vec<lash_core::TriggerRegistration>> {
61        let store = self.core.env.trigger_store.as_ref().ok_or_else(|| {
62            EmbedError::Plugin(lash_core::PluginError::Session(
63                "trigger store is unavailable in this runtime".to_string(),
64            ))
65        })?;
66        let records = store.list_subscriptions(filter).await?;
67        Ok(records
68            .iter()
69            .map(lash_core::TriggerRegistration::from)
70            .collect())
71    }
72}
73
74#[derive(Clone)]
75pub struct Processes {
76    pub(crate) core: LashCore,
77}
78
79impl Processes {
80    fn registry(&self) -> Result<Arc<dyn lash_core::ProcessRegistry>> {
81        self.core
82            .env
83            .process_registry
84            .as_ref()
85            .cloned()
86            .ok_or_else(|| {
87                EmbedError::Plugin(lash_core::PluginError::Session(
88                    "process registry is unavailable in this runtime".to_string(),
89                ))
90            })
91    }
92
93    fn make_observer(&self) -> Result<lash_core::ProcessWorkObserver> {
94        Ok(lash_core::ProcessWorkObserver::new(self.registry()?))
95    }
96
97    fn process_invocation(command: &lash_core::ProcessCommand) -> lash_core::RuntimeInvocation {
98        let effect_id = command.effect_id();
99        lash_core::RuntimeInvocation::effect(
100            lash_core::runtime::RuntimeScope::new("runtime"),
101            effect_id.clone(),
102            lash_core::RuntimeEffectKind::Process,
103            effect_id,
104        )
105    }
106
107    async fn run_command(
108        &self,
109        command: lash_core::ProcessCommand,
110        scoped_effect_controller: ScopedEffectController<'_>,
111    ) -> Result<lash_core::ProcessEffectOutcome> {
112        let registry = self.registry()?;
113        let invocation = Self::process_invocation(&command);
114        let outcome = scoped_effect_controller
115            .controller()
116            .execute_effect(
117                lash_core::RuntimeEffectEnvelope::new(
118                    invocation,
119                    lash_core::RuntimeEffectCommand::process(command),
120                ),
121                lash_core::RuntimeEffectLocalExecutor::processes(registry),
122            )
123            .await
124            .map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))?;
125        match outcome {
126            lash_core::RuntimeEffectOutcome::Process { result } => Ok(result),
127            _ => Err(EmbedError::Plugin(lash_core::PluginError::Session(
128                "process effect returned non-process outcome".to_string(),
129            ))),
130        }
131    }
132
133    pub async fn start(
134        &self,
135        request: lash_core::ProcessStartRequest,
136        scoped_effect_controller: ScopedEffectController<'_>,
137    ) -> Result<lash_core::ProcessRecord> {
138        let env_ref = match request.env_spec.as_ref() {
139            Some(env_spec) => Some(
140                lash_core::runtime::persist_process_execution_env(
141                    self.core
142                        .env
143                        .core
144                        .durability
145                        .lashlang_artifact_store
146                        .as_ref(),
147                    env_spec,
148                )
149                .await?,
150            ),
151            None => None,
152        };
153        let grant = request.grant.clone();
154        let registration =
155            request.into_registration(self.core.env.core.profile.host_profile_id.clone(), env_ref);
156        let command = lash_core::ProcessCommand::Start {
157            registration,
158            grant,
159            execution_context: Box::new(lash_core::ProcessExecutionContext::default()),
160        };
161        let outcome = self
162            .run_command(command, scoped_effect_controller.clone())
163            .await?;
164        let lash_core::ProcessEffectOutcome::Start { record } = outcome else {
165            return Err(EmbedError::Plugin(lash_core::PluginError::Session(
166                "process start returned the wrong outcome".to_string(),
167            )));
168        };
169        if let Some(poke) = self.core.process_work_runner.poke().await {
170            poke.poke();
171        }
172        Ok(record)
173    }
174
175    pub async fn list(
176        &self,
177        filter: &lash_core::ProcessListFilter,
178    ) -> Result<Vec<lash_core::ObservedProcess>> {
179        self.make_observer()?.list(filter).await.map_err(Into::into)
180    }
181
182    pub async fn get(&self, process_id: &str) -> Result<Option<lash_core::ObservedProcess>> {
183        Ok(self.make_observer()?.process(process_id).await)
184    }
185
186    pub async fn events(
187        &self,
188        process_id: &str,
189        after_sequence: u64,
190    ) -> Result<Vec<lash_core::ObservedProcessEvent>> {
191        self.make_observer()?
192            .events_after(process_id, after_sequence)
193            .await
194            .map_err(Into::into)
195    }
196
197    pub async fn await_output(&self, process_id: &str) -> Result<lash_core::ProcessAwaitOutput> {
198        self.registry()?
199            .await_process(process_id)
200            .await
201            .map_err(Into::into)
202    }
203
204    pub async fn cancel(
205        &self,
206        process_id: &str,
207        scoped_effect_controller: ScopedEffectController<'_>,
208    ) -> Result<lash_core::ProcessCancelSummary> {
209        let command = lash_core::ProcessCommand::Cancel {
210            process_id: process_id.to_string(),
211            reason: Some("requested by host".to_string()),
212        };
213        let outcome = self
214            .run_command(command, scoped_effect_controller.clone())
215            .await?;
216        let lash_core::ProcessEffectOutcome::Cancel { record } = outcome else {
217            return Err(EmbedError::Plugin(lash_core::PluginError::Session(
218                "process cancel returned the wrong outcome".to_string(),
219            )));
220        };
221        Ok(lash_core::ProcessCancelSummary::from_record(record))
222    }
223
224    pub async fn signal(
225        &self,
226        process_id: &str,
227        signal_name: impl Into<String>,
228        signal_id: impl Into<String>,
229        request: lash_core::ProcessEventAppendRequest,
230        scoped_effect_controller: ScopedEffectController<'_>,
231    ) -> Result<lash_core::ProcessEvent> {
232        let signal_name = signal_name.into();
233        let event_type = request.event_type.clone();
234        let payload = request.payload.clone();
235        let command = lash_core::ProcessCommand::Signal {
236            process_id: process_id.to_string(),
237            signal_name: signal_name.clone(),
238            signal_id: signal_id.into(),
239            request,
240        };
241        let outcome = self
242            .run_command(command, scoped_effect_controller.clone())
243            .await?;
244        let lash_core::ProcessEffectOutcome::Signal { event } = outcome else {
245            return Err(EmbedError::Plugin(lash_core::PluginError::Session(
246                "process signal returned the wrong outcome".to_string(),
247            )));
248        };
249        let registry = self.registry()?;
250        let waiting_ordinal =
251            registry
252                .get_process(process_id)
253                .await
254                .and_then(|record| match record.wait {
255                    Some(lash_core::WaitState {
256                        kind:
257                            lash_core::WaitKind::Signal {
258                                name,
259                                event_type: wait_event_type,
260                                ordinal,
261                                ..
262                            },
263                        ..
264                    }) if name == signal_name && wait_event_type == event_type => Some(ordinal),
265                    _ => None,
266                });
267        let ordinal = match waiting_ordinal {
268            Some(ordinal) => ordinal,
269            None => {
270                registry
271                    .count_events_through(process_id, &event_type, event.sequence)
272                    .await?
273            }
274        };
275        if ordinal > 0 {
276            let key = scoped_effect_controller
277                .controller()
278                .await_event_key(
279                    &lash_core::ExecutionScope::process(process_id),
280                    lash_core::AwaitEventWaitIdentity::process_signal(
281                        process_id,
282                        &signal_name,
283                        ordinal,
284                    ),
285                )
286                .await
287                .map_err(|err| {
288                    EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
289                })?;
290            let _ = scoped_effect_controller
291                .controller()
292                .resolve_await_event(&key, lash_core::Resolution::Ok(payload))
293                .await
294                .map_err(|err| {
295                    EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
296                })?;
297        }
298        Ok(event)
299    }
300
301    pub async fn session_snapshot(
302        &self,
303        session_id: impl Into<String>,
304    ) -> Result<lash_core::ProcessWorkSnapshot> {
305        self.make_observer()?
306            .snapshot_for_session(session_id)
307            .await
308            .map_err(Into::into)
309    }
310
311    pub fn observer(&self) -> Result<lash_core::ProcessWorkObserver> {
312        self.make_observer()
313    }
314}
315
316#[derive(Clone)]
317pub struct SessionAdmin {
318    pub(crate) runtime: RuntimeHandle,
319}
320
321impl SessionAdmin {
322    pub fn config(&self) -> SessionConfigAdmin {
323        SessionConfigAdmin {
324            control: self.clone(),
325        }
326    }
327
328    pub fn tools(&self) -> ToolAdmin {
329        ToolAdmin {
330            control: self.clone(),
331        }
332    }
333
334    pub fn commands(&self) -> SessionCommandAdmin {
335        SessionCommandAdmin {
336            control: self.clone(),
337        }
338    }
339
340    pub fn triggers(&self) -> SessionTriggerAdmin {
341        SessionTriggerAdmin {
342            control: self.clone(),
343        }
344    }
345
346    pub fn state(&self) -> SessionStateAdmin {
347        SessionStateAdmin {
348            control: self.clone(),
349        }
350    }
351
352    pub fn children(&self) -> ChildSessionAdmin {
353        ChildSessionAdmin {
354            control: self.clone(),
355        }
356    }
357
358    pub fn injection(&self) -> InjectionAdmin {
359        InjectionAdmin {
360            control: self.clone(),
361        }
362    }
363
364    pub fn mode(&self) -> ModeAdmin {
365        ModeAdmin {
366            control: self.clone(),
367        }
368    }
369
370    pub fn processes(&self) -> SessionProcessAdmin {
371        SessionProcessAdmin {
372            control: self.clone(),
373        }
374    }
375
376    /// Run `f` against the locked runtime writer, then publish the resulting
377    /// observation. The body is the canonical `lock → call → publish_from`
378    /// stamp shared by nearly every mutating control method; publish happens
379    /// unconditionally once the closure returns.
380    async fn with_writer<F, T>(&self, f: F) -> T
381    where
382        F: AsyncFnOnce(&mut LashRuntime) -> T,
383    {
384        let writer = self.runtime.writer();
385        let mut runtime = writer.lock().await;
386        let value = f(&mut runtime).await;
387        self.runtime.publish_from(&runtime);
388        value
389    }
390
391    async fn update_config(&self, patch: SessionConfigPatch) -> Result<()> {
392        self.update_session_config(patch.provider, patch.model, patch.prompt)
393            .await?;
394        Ok(())
395    }
396
397    async fn update_session_config(
398        &self,
399        provider: Option<ProviderHandle>,
400        model: Option<lash_core::ModelSpec>,
401        prompt: Option<PromptLayer>,
402    ) -> Result<()> {
403        self.with_writer(async |runtime: &mut LashRuntime| {
404            runtime.update_session_config(provider, model, prompt).await;
405        })
406        .await;
407        Ok(())
408    }
409
410    async fn export_state(&self) -> lash_core::SessionSnapshot {
411        self.runtime.observe().read_view.to_snapshot()
412    }
413
414    async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
415        self.with_writer(async |runtime: &mut LashRuntime| {
416            runtime
417                .append_session_nodes(lash_core::AppendSessionNodesRequest {
418                    nodes: messages
419                        .into_iter()
420                        .map(lash_core::SessionAppendNode::message)
421                        .collect(),
422                    requires_ancestor_node_id: None,
423                })
424                .await
425                .map(|_| ())
426                .map_err(Into::into)
427        })
428        .await
429    }
430
431    async fn append_plugin_body(
432        &self,
433        plugin_type: impl Into<String>,
434        body: serde_json::Value,
435    ) -> Result<()> {
436        self.with_writer(async |runtime: &mut LashRuntime| {
437            runtime
438                .append_session_nodes(lash_core::AppendSessionNodesRequest {
439                    nodes: vec![lash_core::SessionAppendNode::plugin(plugin_type, body)],
440                    requires_ancestor_node_id: None,
441                })
442                .await
443                .map(|_| ())
444                .map_err(Into::into)
445        })
446        .await
447    }
448
449    async fn set_persisted_state(&self, state: RuntimeSessionState) -> Result<()> {
450        self.with_writer(async |runtime: &mut LashRuntime| {
451            runtime.set_persisted_state(state).map_err(Into::into)
452        })
453        .await
454    }
455
456    async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
457        self.with_writer(async |runtime: &mut LashRuntime| {
458            runtime.set_prompt_template(template).await;
459        })
460        .await;
461        Ok(())
462    }
463
464    async fn clear_prompt_template(&self) -> Result<()> {
465        self.with_writer(async |runtime: &mut LashRuntime| {
466            runtime.clear_prompt_template().await;
467        })
468        .await;
469        Ok(())
470    }
471
472    async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
473        self.with_writer(async |runtime: &mut LashRuntime| {
474            runtime.add_prompt_contribution(contribution).await;
475        })
476        .await;
477        Ok(())
478    }
479
480    async fn replace_prompt_slot(
481        &self,
482        slot: PromptSlot,
483        contributions: impl IntoIterator<Item = PromptContribution>,
484    ) -> Result<()> {
485        self.with_writer(async |runtime: &mut LashRuntime| {
486            runtime.replace_prompt_slot(slot, contributions).await;
487        })
488        .await;
489        Ok(())
490    }
491
492    async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
493        self.with_writer(async |runtime: &mut LashRuntime| {
494            runtime.clear_prompt_slot(slot).await;
495        })
496        .await;
497        Ok(())
498    }
499
500    async fn apply_protocol_session_extension(
501        &self,
502        extension: lash_core::ProtocolSessionExtensionHandle,
503    ) -> Result<()> {
504        self.with_writer(async |runtime: &mut LashRuntime| {
505            runtime
506                .apply_protocol_session_extension(extension)
507                .await
508                .map_err(Into::into)
509        })
510        .await
511    }
512
513    async fn branch_to_node(
514        &self,
515        target_leaf: Option<String>,
516    ) -> Result<lash_core::SessionSnapshot> {
517        self.with_writer(async |runtime: &mut LashRuntime| {
518            runtime
519                .branch_to_node(target_leaf)
520                .await
521                .map_err(Into::into)
522        })
523        .await
524    }
525
526    async fn await_background_work(&self) -> Result<()> {
527        self.with_writer(async |runtime: &mut LashRuntime| {
528            runtime.await_background_work().await.map_err(Into::into)
529        })
530        .await
531    }
532
533    async fn refresh_tool_catalog(&self) -> Result<()> {
534        self.with_writer(async |runtime: &mut LashRuntime| {
535            runtime
536                .refresh_session_tool_catalog()
537                .await
538                .map_err(Into::into)
539        })
540        .await
541    }
542
543    async fn submit_session_command(
544        &self,
545        command: lash_core::SessionCommand,
546        idempotency_key: impl Into<String>,
547    ) -> Result<lash_core::SessionCommandReceipt> {
548        let idempotency_key = idempotency_key.into();
549        self.with_writer(async |runtime: &mut LashRuntime| {
550            runtime
551                .submit_session_command(command, idempotency_key)
552                .await
553                .map_err(Into::into)
554        })
555        .await
556    }
557
558    async fn list_lashlang_trigger_registrations(
559        &self,
560    ) -> Result<Vec<lash_core::TriggerRegistration>> {
561        self.with_writer(async |runtime: &mut LashRuntime| {
562            runtime
563                .list_lashlang_trigger_registrations()
564                .await
565                .map_err(Into::into)
566        })
567        .await
568    }
569
570    async fn lashlang_trigger_registrations_by_source_type(
571        &self,
572        source_type: impl Into<lash_core::TriggerEventType>,
573    ) -> Result<Vec<lash_core::TriggerRegistration>> {
574        self.with_writer(async |runtime: &mut LashRuntime| {
575            runtime
576                .lashlang_trigger_registrations_by_source_type(source_type)
577                .await
578                .map_err(Into::into)
579        })
580        .await
581    }
582
583    async fn invoke_plugin_action(
584        &self,
585        name: &str,
586        args: serde_json::Value,
587    ) -> Result<ToolResult> {
588        let session_id = self.runtime.observe().session_id().to_string();
589        let writer = self.runtime.writer();
590        writer
591            .lock()
592            .await
593            .invoke_plugin_action(name, args, Some(session_id))
594            .await
595            .map_err(Into::into)
596    }
597
598    async fn call_plugin_action<Op: lash_core::PluginAction>(
599        &self,
600        args: Op::Args,
601    ) -> Result<Op::Output> {
602        let result = self
603            .invoke_plugin_action(
604                Op::NAME,
605                serde_json::to_value(args).map_err(|err| {
606                    EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
607                        "invalid {} args: {err}",
608                        Op::NAME
609                    )))
610                })?,
611            )
612            .await?;
613        let Some(output) = result.as_done_output() else {
614            return Err(EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
615                "{} returned a pending result where completed output is required",
616                Op::NAME
617            ))));
618        };
619        if !output.is_success() {
620            return Err(EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
621                "{} failed: {}",
622                Op::NAME,
623                output.value_for_projection()
624            ))));
625        }
626        serde_json::from_value(output.value_for_projection()).map_err(|err| {
627            EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
628                "invalid {} output: {err}",
629                Op::NAME
630            )))
631        })
632    }
633
634    async fn compact_context(
635        &self,
636        instructions: Option<String>,
637        scoped_effect_controller: ScopedEffectController<'_>,
638    ) -> Result<bool> {
639        self.with_writer(async |runtime: &mut LashRuntime| {
640            runtime
641                .compact_context(instructions, scoped_effect_controller)
642                .await
643                .map_err(Into::into)
644        })
645        .await
646    }
647
648    async fn persist_current_state(&self) -> Result<RuntimeSessionState> {
649        self.with_writer(async |runtime: &mut LashRuntime| {
650            runtime.await_background_work().await?;
651            Ok(runtime.export_persisted_state())
652        })
653        .await
654    }
655
656    async fn list_process_handles(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
657        Ok(self.runtime.observe().list_process_handles().await)
658    }
659
660    async fn list_all_process_handles(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
661        Ok(self.runtime.observe().list_all_process_handles().await)
662    }
663
664    async fn start_process(
665        &self,
666        request: lash_core::ProcessStartRequest,
667        scoped_effect_controller: ScopedEffectController<'_>,
668    ) -> Result<lash_core::ProcessHandleSummary> {
669        let writer = self.runtime.writer();
670        let runtime = writer.lock().await;
671        let session_id = runtime.session_id().to_string();
672        let processes = runtime.process_service()?;
673        let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
674        let summary = processes
675            .start_from_request(&session_id, request, scope)
676            .await
677            .map_err(EmbedError::Plugin)?;
678        self.runtime.record_process_changed(
679            SessionProcessEventKind::Started,
680            vec![summary.process_id.clone()],
681        );
682        Ok(summary)
683    }
684
685    async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
686        self.runtime
687            .writer()
688            .lock()
689            .await
690            .session_state_service()
691            .map_err(Into::into)
692    }
693
694    async fn cancel_process(
695        &self,
696        process_id: &str,
697        scoped_effect_controller: ScopedEffectController<'_>,
698    ) -> Result<lash_core::ProcessCancelSummary> {
699        let writer = self.runtime.writer();
700        let runtime = writer.lock().await;
701        let session_id = runtime.session_id().to_string();
702        let processes = runtime.process_service()?;
703        let cancel_ability = runtime.process_cancel_ability();
704        let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
705        let summary = cancel_ability
706            .cancel_summary(
707                processes.as_ref(),
708                lash_core::ProcessCancelRequest::new(
709                    &session_id,
710                    process_id,
711                    scope,
712                    lash_core::ProcessCancelSource::HostApi,
713                )
714                .with_reason("requested by host API"),
715            )
716            .await
717            .map_err(EmbedError::Plugin)?;
718        self.runtime.record_process_changed(
719            SessionProcessEventKind::Cancelled,
720            vec![summary.process_id.clone()],
721        );
722        Ok(summary)
723    }
724
725    async fn cancel_visible_processes(
726        &self,
727        scoped_effect_controller: ScopedEffectController<'_>,
728    ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
729        let writer = self.runtime.writer();
730        let runtime = writer.lock().await;
731        let session_id = runtime.session_id().to_string();
732        let processes = runtime.process_service()?;
733        let cancel_ability = runtime.process_cancel_ability();
734        let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
735        let summaries = cancel_ability
736            .cancel_all_visible(
737                processes.as_ref(),
738                lash_core::ProcessCancelAllRequest::new(
739                    &session_id,
740                    scope,
741                    lash_core::ProcessCancelSource::HostApi,
742                )
743                .with_reason("requested by host API"),
744            )
745            .await
746            .map_err(EmbedError::Plugin)?;
747        self.runtime.record_process_changed(
748            SessionProcessEventKind::Cancelled,
749            summaries
750                .iter()
751                .map(|summary| summary.process_id.clone())
752                .collect(),
753        );
754        Ok(summaries)
755    }
756
757    async fn snapshot_execution_state(&self) -> Result<Option<Vec<u8>>> {
758        self.with_writer(async |runtime: &mut LashRuntime| {
759            runtime.snapshot_execution_state().await.map_err(Into::into)
760        })
761        .await
762    }
763
764    async fn restore_execution_state(&self, bytes: &[u8]) -> Result<()> {
765        self.with_writer(async |runtime: &mut LashRuntime| {
766            runtime
767                .restore_execution_state(bytes)
768                .await
769                .map_err(Into::into)
770        })
771        .await
772    }
773
774    async fn tool_state(&self) -> Result<ToolState> {
775        self.runtime.observe().tool_state.clone().ok_or_else(|| {
776            EmbedError::Session(SessionError::Protocol(
777                "runtime session not available".to_string(),
778            ))
779        })
780    }
781
782    async fn apply_tool_state(&self, state: ToolState) -> Result<u64> {
783        self.with_writer(async |runtime: &mut LashRuntime| {
784            runtime
785                .apply_tool_state(state)
786                .await
787                .map_err(EmbedError::from)
788        })
789        .await
790    }
791
792    async fn restore_tool_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
793        self.with_writer(async |runtime: &mut LashRuntime| {
794            runtime
795                .restore_tool_state(state)
796                .await
797                .map_err(EmbedError::from)
798        })
799        .await
800    }
801
802    async fn set_tool_availability(
803        &self,
804        name: &str,
805        availability: ToolAvailability,
806    ) -> Result<u64> {
807        self.set_tool_availability_many(&[(name, availability)])
808            .await
809    }
810
811    async fn set_tool_availability_many<N: AsRef<str>>(
812        &self,
813        updates: &[(N, ToolAvailability)],
814    ) -> Result<u64> {
815        let mut state = self.tool_state().await?;
816        for (name, availability) in updates {
817            state
818                .set_availability(name.as_ref(), Some(*availability))
819                .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
820        }
821        self.apply_tool_state(state).await
822    }
823
824    async fn clear_tool_availability_override(&self, name: &str) -> Result<u64> {
825        let mut state = self.tool_state().await?;
826        state
827            .set_availability(name, None)
828            .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
829        self.apply_tool_state(state).await
830    }
831
832    async fn active_tool_manifests(&self) -> Result<Vec<ToolManifest>> {
833        Ok(self.tool_state().await?.tool_manifests())
834    }
835
836    async fn add_tool_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
837        let tool_registry = self.tool_registry().await?;
838        let handle = tool_registry
839            .add_tool_provider(provider)
840            .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
841        self.refresh_tool_catalog().await?;
842        Ok(handle)
843    }
844
845    async fn remove_tool_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
846        let tool_registry = self.tool_registry().await?;
847        let generation = tool_registry
848            .remove_source(handle)
849            .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
850        self.refresh_tool_catalog().await?;
851        Ok(generation)
852    }
853
854    async fn create_child_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
855        let writer = self.runtime.writer();
856        let runtime = writer.lock().await;
857        let lifecycle = runtime.session_lifecycle_service()?;
858        lifecycle.create_session(request).await.map_err(Into::into)
859    }
860
861    async fn close_child_session(&self, session_id: &str) -> Result<()> {
862        let writer = self.runtime.writer();
863        let runtime = writer.lock().await;
864        let lifecycle = runtime.session_lifecycle_service()?;
865        lifecycle
866            .close_session(session_id)
867            .await
868            .map_err(Into::into)
869    }
870
871    async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
872        self.with_writer(async |runtime: &mut LashRuntime| {
873            runtime
874                .activate_managed_session(session_id)
875                .await
876                .map_err(Into::into)
877        })
878        .await
879    }
880
881    async fn inject_turn_input(&self, id: Option<String>, message: PluginMessage) -> Result<()> {
882        self.inject_turn_inputs(vec![lash_core::InjectedTurnInput { id, message }])
883            .await
884    }
885
886    async fn inject_turn_inputs(&self, messages: Vec<lash_core::InjectedTurnInput>) -> Result<()> {
887        for input in messages {
888            let source_key = input.id.map(|id| format!("injection:{id}"));
889            let turn_input = turn_input_from_plugin_message(input.message);
890            self.runtime
891                .enqueue_turn_input(
892                    turn_input,
893                    lash_core::DeliveryPolicy::EarliestSafeBoundary,
894                    lash_core::SlotPolicy::Join,
895                    source_key,
896                )
897                .await
898                .map(|_| ())
899                .map_err(EmbedError::Runtime)?;
900        }
901        Ok(())
902    }
903
904    async fn tool_registry(&self) -> Result<Arc<lash_core::ToolRegistry>> {
905        self.runtime
906            .writer()
907            .lock()
908            .await
909            .plugin_session()
910            .map(|session| session.tool_registry())
911            .ok_or_else(|| {
912                EmbedError::Session(SessionError::Protocol(
913                    "tool registry is unavailable in this runtime session".to_string(),
914                ))
915            })
916    }
917}
918
919fn turn_input_from_plugin_message(message: PluginMessage) -> TurnInput {
920    let mut input = TurnInput::empty();
921    if !message.content.is_empty() {
922        input.items.push(InputItem::Text {
923            text: message.content,
924        });
925    }
926    for (index, bytes) in message.images.into_iter().enumerate() {
927        let id = format!("injected-image-{index}");
928        input.items.push(InputItem::ImageRef { id: id.clone() });
929        input.image_blobs.insert(id, bytes);
930    }
931    input
932}
933
934#[derive(Clone)]
935pub struct SessionConfigAdmin {
936    control: SessionAdmin,
937}
938
939impl SessionConfigAdmin {
940    pub async fn update(&self, patch: SessionConfigPatch) -> Result<()> {
941        self.control.update_config(patch).await
942    }
943
944    pub async fn update_session_config(
945        &self,
946        provider: Option<ProviderHandle>,
947        model: Option<lash_core::ModelSpec>,
948        prompt: Option<PromptLayer>,
949    ) -> Result<()> {
950        self.control
951            .update_session_config(provider, model, prompt)
952            .await
953    }
954
955    pub async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
956        self.control.set_prompt_template(template).await
957    }
958
959    pub async fn clear_prompt_template(&self) -> Result<()> {
960        self.control.clear_prompt_template().await
961    }
962
963    pub async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
964        self.control.add_prompt_contribution(contribution).await
965    }
966
967    pub async fn replace_prompt_slot(
968        &self,
969        slot: PromptSlot,
970        contributions: impl IntoIterator<Item = PromptContribution>,
971    ) -> Result<()> {
972        self.control.replace_prompt_slot(slot, contributions).await
973    }
974
975    pub async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
976        self.control.clear_prompt_slot(slot).await
977    }
978}
979
980#[derive(Clone)]
981pub struct ToolAdmin {
982    control: SessionAdmin,
983}
984
985impl ToolAdmin {
986    pub(crate) fn new(control: SessionAdmin) -> Self {
987        Self { control }
988    }
989}
990
991impl ToolAdmin {
992    pub async fn state(&self) -> Result<ToolState> {
993        self.control.tool_state().await
994    }
995
996    pub fn advanced(&self) -> AdvancedToolAdmin {
997        AdvancedToolAdmin {
998            control: self.control.clone(),
999        }
1000    }
1001
1002    pub async fn set_availability(
1003        &self,
1004        name: impl AsRef<str>,
1005        availability: ToolAvailability,
1006    ) -> Result<u64> {
1007        self.control
1008            .set_tool_availability(name.as_ref(), availability)
1009            .await
1010    }
1011
1012    pub async fn set_availability_many<N: AsRef<str>>(
1013        &self,
1014        updates: &[(N, ToolAvailability)],
1015    ) -> Result<u64> {
1016        self.control.set_tool_availability_many(updates).await
1017    }
1018
1019    pub async fn clear_availability_override(&self, name: impl AsRef<str>) -> Result<u64> {
1020        self.control
1021            .clear_tool_availability_override(name.as_ref())
1022            .await
1023    }
1024
1025    pub async fn active_manifests(&self) -> Result<Vec<ToolManifest>> {
1026        self.control.active_tool_manifests().await
1027    }
1028
1029    pub async fn add_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
1030        self.control.add_tool_provider(provider).await
1031    }
1032
1033    pub async fn remove_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
1034        self.control.remove_tool_source(handle).await
1035    }
1036}
1037
1038#[derive(Clone)]
1039pub struct AdvancedToolAdmin {
1040    control: SessionAdmin,
1041}
1042
1043impl AdvancedToolAdmin {
1044    /// Replace the entire tool-state snapshot.
1045    ///
1046    /// This is a generation-checked escape hatch for hosts that intentionally
1047    /// edit the full snapshot. Prefer `ToolAdmin` availability methods for
1048    /// ordinary tool policy changes.
1049    pub async fn apply_state(&self, state: ToolState) -> Result<u64> {
1050        self.control.apply_tool_state(state).await
1051    }
1052
1053    /// Restore a persisted tool-state snapshot, adopting its generation.
1054    ///
1055    /// Use this when re-applying a snapshot read from durable storage (session
1056    /// resume), not an edited delta: it reconstructs the exact persisted surface
1057    /// idempotently rather than requiring the snapshot to match the current
1058    /// generation. A cold resume of a session whose surface reached generation
1059    /// ≥ 2 needs this — [`apply_state`](Self::apply_state) would reject it.
1060    ///
1061    /// Persisted tools whose source is not currently registered (e.g. a
1062    /// detached MCP server) do not fail the restore: they are kept as orphans,
1063    /// forced `Off`, listed in the returned [`ToolRestoreReport`], and rebind
1064    /// automatically when a source re-advertises the same tool.
1065    pub async fn restore_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
1066        self.control.restore_tool_state(state).await
1067    }
1068}
1069
1070#[derive(Clone)]
1071pub struct SessionCommandAdmin {
1072    control: SessionAdmin,
1073}
1074
1075impl SessionCommandAdmin {
1076    /// Enqueue an unconditional tool-catalog refresh. The command drains
1077    /// asynchronously and recomputes the surface from live sources, so it
1078    /// takes no generation guard — any generation observed at enqueue time
1079    /// could legitimately have advanced by drain time.
1080    pub async fn refresh_tool_catalog(
1081        &self,
1082        reason: impl Into<String>,
1083        idempotency_key: impl Into<String>,
1084    ) -> Result<lash_core::SessionCommandReceipt> {
1085        self.control
1086            .submit_session_command(
1087                lash_core::SessionCommand::RefreshToolCatalog {
1088                    reason: reason.into(),
1089                },
1090                idempotency_key,
1091            )
1092            .await
1093    }
1094
1095    pub async fn reset(
1096        &self,
1097        reason: impl Into<String>,
1098        idempotency_key: impl Into<String>,
1099    ) -> Result<lash_core::SessionCommandReceipt> {
1100        self.control
1101            .submit_session_command(
1102                lash_core::SessionCommand::ResetSession {
1103                    reason: reason.into(),
1104                },
1105                idempotency_key,
1106            )
1107            .await
1108    }
1109}
1110
1111/// Session-scoped read controls for Lashlang trigger registrations.
1112#[derive(Clone)]
1113pub struct SessionTriggerAdmin {
1114    control: SessionAdmin,
1115}
1116
1117impl SessionTriggerAdmin {
1118    /// Return every trigger registration in the session.
1119    ///
1120    /// This is an admin/introspection view. Source owners should prefer
1121    /// [`Self::by_source_type`] so they only inspect registrations for the
1122    /// concrete source type they own.
1123    pub async fn list_all(&self) -> Result<Vec<lash_core::TriggerRegistration>> {
1124        self.control.list_lashlang_trigger_registrations().await
1125    }
1126
1127    /// Return registrations whose source value has the given host descriptor type.
1128    ///
1129    /// This is the source-owner API: a timer, UI, webhook, or other host-owned
1130    /// source uses it to inspect registrations for keys it may schedule and emit.
1131    pub async fn by_source_type(
1132        &self,
1133        source_type: impl Into<lash_core::TriggerEventType>,
1134    ) -> Result<Vec<lash_core::TriggerRegistration>> {
1135        self.control
1136            .lashlang_trigger_registrations_by_source_type(source_type)
1137            .await
1138    }
1139}
1140
1141#[derive(Clone)]
1142pub struct SessionProcessAdmin {
1143    control: SessionAdmin,
1144}
1145
1146impl SessionProcessAdmin {
1147    pub(crate) fn new(control: SessionAdmin) -> Self {
1148        Self { control }
1149    }
1150
1151    pub async fn start(
1152        &self,
1153        request: lash_core::ProcessStartRequest,
1154        scoped_effect_controller: ScopedEffectController<'_>,
1155    ) -> Result<lash_core::ProcessHandleSummary> {
1156        self.control
1157            .start_process(request, scoped_effect_controller)
1158            .await
1159    }
1160
1161    pub async fn list(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
1162        self.control.list_process_handles().await
1163    }
1164
1165    pub async fn list_all(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
1166        self.control.list_all_process_handles().await
1167    }
1168
1169    pub async fn await_all(&self) -> Result<()> {
1170        self.control.await_background_work().await
1171    }
1172
1173    pub async fn cancel(
1174        &self,
1175        process_id: &str,
1176        scoped_effect_controller: ScopedEffectController<'_>,
1177    ) -> Result<lash_core::ProcessCancelSummary> {
1178        self.control
1179            .cancel_process(process_id, scoped_effect_controller)
1180            .await
1181    }
1182
1183    pub async fn cancel_all(
1184        &self,
1185        scoped_effect_controller: ScopedEffectController<'_>,
1186    ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
1187        self.control
1188            .cancel_visible_processes(scoped_effect_controller)
1189            .await
1190    }
1191}
1192
1193#[derive(Clone)]
1194pub struct SessionStateAdmin {
1195    control: SessionAdmin,
1196}
1197
1198impl SessionStateAdmin {
1199    pub async fn export(&self) -> lash_core::SessionSnapshot {
1200        self.control.export_state().await
1201    }
1202
1203    pub async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
1204        self.control.append_messages(messages).await
1205    }
1206
1207    pub async fn append_plugin_body(
1208        &self,
1209        plugin_type: impl Into<String>,
1210        body: serde_json::Value,
1211    ) -> Result<()> {
1212        self.control.append_plugin_body(plugin_type, body).await
1213    }
1214
1215    pub async fn set_persisted(&self, state: RuntimeSessionState) -> Result<()> {
1216        self.control.set_persisted_state(state).await
1217    }
1218
1219    pub async fn branch_to_node(
1220        &self,
1221        target_leaf: Option<String>,
1222    ) -> Result<lash_core::SessionSnapshot> {
1223        self.control.branch_to_node(target_leaf).await
1224    }
1225
1226    pub async fn persist_current(&self) -> Result<RuntimeSessionState> {
1227        self.control.persist_current_state().await
1228    }
1229
1230    pub async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
1231        self.control.session_state_service().await
1232    }
1233
1234    pub async fn snapshot_execution(&self) -> Result<Option<Vec<u8>>> {
1235        self.control.snapshot_execution_state().await
1236    }
1237
1238    pub async fn restore_execution(&self, bytes: &[u8]) -> Result<()> {
1239        self.control.restore_execution_state(bytes).await
1240    }
1241
1242    pub async fn compact_context(
1243        &self,
1244        instructions: Option<String>,
1245        scoped_effect_controller: ScopedEffectController<'_>,
1246    ) -> Result<bool> {
1247        self.control
1248            .compact_context(instructions, scoped_effect_controller)
1249            .await
1250    }
1251}
1252
1253#[derive(Clone)]
1254pub struct PluginActions {
1255    pub(crate) control: SessionAdmin,
1256}
1257
1258impl PluginActions {
1259    pub async fn call<Op: lash_core::PluginAction>(&self, args: Op::Args) -> Result<Op::Output> {
1260        self.control.call_plugin_action::<Op>(args).await
1261    }
1262}
1263
1264#[derive(Clone)]
1265pub struct ChildSessionAdmin {
1266    control: SessionAdmin,
1267}
1268
1269impl ChildSessionAdmin {
1270    pub async fn create_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
1271        self.control.create_child_session(request).await
1272    }
1273
1274    pub async fn close_session(&self, session_id: &str) -> Result<()> {
1275        self.control.close_child_session(session_id).await
1276    }
1277
1278    pub async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
1279        self.control.activate_managed_session(session_id).await
1280    }
1281}
1282
1283#[derive(Clone)]
1284pub struct InjectionAdmin {
1285    control: SessionAdmin,
1286}
1287
1288impl InjectionAdmin {
1289    pub async fn inject_turn_input(
1290        &self,
1291        id: Option<String>,
1292        message: PluginMessage,
1293    ) -> Result<()> {
1294        self.control.inject_turn_input(id, message).await
1295    }
1296
1297    pub async fn inject_turn_inputs(
1298        &self,
1299        messages: Vec<lash_core::InjectedTurnInput>,
1300    ) -> Result<()> {
1301        self.control.inject_turn_inputs(messages).await
1302    }
1303}
1304
1305#[derive(Clone)]
1306pub struct ModeAdmin {
1307    control: SessionAdmin,
1308}
1309
1310impl ModeAdmin {
1311    pub async fn apply_session_extension(
1312        &self,
1313        extension: lash_core::ProtocolSessionExtensionHandle,
1314    ) -> Result<()> {
1315        self.control
1316            .apply_protocol_session_extension(extension)
1317            .await
1318    }
1319}