Skip to main content

lash/
admin.rs

1pub use crate::session::SessionConfigPatch;
2use crate::support::*;
3pub use lash_core::{AcceptedInjectedTurnInput, PluginCommand, PluginQuery, PluginTask};
4
5#[derive(Clone)]
6pub struct Completions {
7    pub(crate) core: LashCore,
8}
9
10impl Completions {
11    pub async fn resolve(
12        &self,
13        key: lash_core::AwaitEventKey,
14        resolution: lash_core::Resolution,
15    ) -> Result<lash_core::ResolveOutcome> {
16        self.core
17            .env
18            .core
19            .control
20            .effect_host
21            .resolve_await_event(&key, resolution)
22            .await
23            .map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))
24    }
25}
26
27#[derive(Clone)]
28pub struct CoreTriggerAdmin {
29    pub(crate) core: LashCore,
30}
31
32impl CoreTriggerAdmin {
33    pub async fn emit(
34        &self,
35        request: lash_core::TriggerOccurrenceRequest,
36        scoped_effect_controller: ScopedEffectController<'_>,
37    ) -> Result<lash_core::TriggerEmitReport> {
38        let store = self.core.env.trigger_store.as_ref().ok_or_else(|| {
39            EmbedError::Plugin(lash_core::PluginError::Session(
40                "trigger store is unavailable in this runtime".to_string(),
41            ))
42        })?;
43        let drivers = self.core.work_driver.drivers().await;
44        let router = lash_core::TriggerRouter::new(
45            Arc::clone(store),
46            self.core.env.process_registry.clone(),
47            drivers.process,
48        );
49        router
50            .emit(request, scoped_effect_controller.controller())
51            .await
52            .map_err(Into::into)
53    }
54
55    pub async fn subscriptions(
56        &self,
57        filter: lash_core::TriggerSubscriptionFilter,
58    ) -> Result<Vec<lash_core::TriggerRegistration>> {
59        let store = self.core.env.trigger_store.as_ref().ok_or_else(|| {
60            EmbedError::Plugin(lash_core::PluginError::Session(
61                "trigger store is unavailable in this runtime".to_string(),
62            ))
63        })?;
64        let records = store.list_subscriptions(filter).await?;
65        Ok(records
66            .iter()
67            .map(lash_core::TriggerRegistration::from)
68            .collect())
69    }
70}
71
72#[derive(Clone)]
73pub struct Processes {
74    pub(crate) core: LashCore,
75}
76
77impl Processes {
78    fn registry(&self) -> Result<Arc<dyn lash_core::ProcessRegistry>> {
79        self.core
80            .env
81            .process_registry
82            .as_ref()
83            .cloned()
84            .ok_or_else(|| {
85                EmbedError::Plugin(lash_core::PluginError::Session(
86                    "process registry is unavailable in this runtime".to_string(),
87                ))
88            })
89    }
90
91    fn make_observer(&self) -> Result<lash_core::ProcessWorkObserver> {
92        Ok(lash_core::ProcessWorkObserver::new(self.registry()?))
93    }
94
95    fn process_invocation(command: &lash_core::ProcessCommand) -> lash_core::RuntimeInvocation {
96        let effect_id = command.effect_id();
97        lash_core::RuntimeInvocation::effect(
98            lash_core::runtime::RuntimeScope::new("runtime"),
99            effect_id.clone(),
100            lash_core::RuntimeEffectKind::Process,
101            effect_id,
102        )
103    }
104
105    async fn run_command(
106        &self,
107        command: lash_core::ProcessCommand,
108        scoped_effect_controller: ScopedEffectController<'_>,
109    ) -> Result<lash_core::ProcessEffectOutcome> {
110        let registry = self.registry()?;
111        let invocation = Self::process_invocation(&command);
112        let outcome = scoped_effect_controller
113            .controller()
114            .execute_effect(
115                lash_core::RuntimeEffectEnvelope::new(
116                    invocation,
117                    lash_core::RuntimeEffectCommand::process(command),
118                ),
119                lash_core::RuntimeEffectLocalExecutor::processes(registry),
120            )
121            .await
122            .map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))?;
123        match outcome {
124            lash_core::RuntimeEffectOutcome::Process { result } => Ok(result),
125            _ => Err(EmbedError::Plugin(lash_core::PluginError::Session(
126                "process effect returned non-process outcome".to_string(),
127            ))),
128        }
129    }
130
131    pub async fn start(
132        &self,
133        request: lash_core::ProcessStartRequest,
134        scoped_effect_controller: ScopedEffectController<'_>,
135    ) -> Result<lash_core::ProcessRecord> {
136        let env_ref = match request.env_spec.as_ref() {
137            Some(env_spec) => Some(
138                lash_core::runtime::persist_process_execution_env(
139                    self.core.env.core.durability.process_env_store.as_ref(),
140                    env_spec,
141                )
142                .await?,
143            ),
144            None => None,
145        };
146        let grant = request.grant.clone();
147        let registration = request.into_registration(env_ref);
148        let command = lash_core::ProcessCommand::Start {
149            registration,
150            grant,
151            execution_context: Box::new(lash_core::ProcessExecutionContext::default()),
152        };
153        let outcome = self
154            .run_command(command, scoped_effect_controller.clone())
155            .await?;
156        let lash_core::ProcessEffectOutcome::Start { record } = outcome else {
157            return Err(EmbedError::Plugin(lash_core::PluginError::Session(
158                "process start returned the wrong outcome".to_string(),
159            )));
160        };
161        if let Some(driver) = self.core.work_driver.drivers().await.process {
162            driver.claim_and_run_pending("admin_process_start").await?;
163        }
164        Ok(record)
165    }
166
167    pub async fn list(
168        &self,
169        filter: &lash_core::ProcessListFilter,
170    ) -> Result<Vec<lash_core::ObservedProcess>> {
171        self.make_observer()?.list(filter).await.map_err(Into::into)
172    }
173
174    pub async fn get(&self, process_id: &str) -> Result<Option<lash_core::ObservedProcess>> {
175        Ok(self.make_observer()?.process(process_id).await)
176    }
177
178    pub async fn events(
179        &self,
180        process_id: &str,
181        after_sequence: u64,
182    ) -> Result<Vec<lash_core::ObservedProcessEvent>> {
183        self.make_observer()?
184            .events_after(process_id, after_sequence)
185            .await
186            .map_err(Into::into)
187    }
188
189    pub async fn await_output(&self, process_id: &str) -> Result<lash_core::ProcessAwaitOutput> {
190        self.registry()?
191            .await_process(process_id)
192            .await
193            .map_err(Into::into)
194    }
195
196    pub async fn cancel(
197        &self,
198        process_id: &str,
199        scoped_effect_controller: ScopedEffectController<'_>,
200    ) -> Result<lash_core::ProcessCancelSummary> {
201        let command = lash_core::ProcessCommand::Cancel {
202            process_id: process_id.to_string(),
203            reason: Some("requested by host".to_string()),
204        };
205        let outcome = self
206            .run_command(command, scoped_effect_controller.clone())
207            .await?;
208        let lash_core::ProcessEffectOutcome::Cancel { record } = outcome else {
209            return Err(EmbedError::Plugin(lash_core::PluginError::Session(
210                "process cancel returned the wrong outcome".to_string(),
211            )));
212        };
213        Ok(lash_core::ProcessCancelSummary::from_record(record))
214    }
215
216    pub async fn signal(
217        &self,
218        process_id: &str,
219        signal_name: impl Into<String>,
220        signal_id: impl Into<String>,
221        request: lash_core::ProcessEventAppendRequest,
222        scoped_effect_controller: ScopedEffectController<'_>,
223    ) -> Result<lash_core::ProcessEvent> {
224        let signal_name = signal_name.into();
225        let event_type = request.event_type.clone();
226        let payload = request.payload.clone();
227        let command = lash_core::ProcessCommand::Signal {
228            process_id: process_id.to_string(),
229            signal_name: signal_name.clone(),
230            signal_id: signal_id.into(),
231            request,
232        };
233        let outcome = self
234            .run_command(command, scoped_effect_controller.clone())
235            .await?;
236        let lash_core::ProcessEffectOutcome::Signal { event } = outcome else {
237            return Err(EmbedError::Plugin(lash_core::PluginError::Session(
238                "process signal returned the wrong outcome".to_string(),
239            )));
240        };
241        let registry = self.registry()?;
242        let waiting_ordinal =
243            registry
244                .get_process(process_id)
245                .await
246                .and_then(|record| match record.wait {
247                    Some(lash_core::WaitState {
248                        kind:
249                            lash_core::WaitKind::Signal {
250                                name,
251                                event_type: wait_event_type,
252                                ordinal,
253                                ..
254                            },
255                        ..
256                    }) if name == signal_name && wait_event_type == event_type => Some(ordinal),
257                    _ => None,
258                });
259        let ordinal = match waiting_ordinal {
260            Some(ordinal) => ordinal,
261            None => {
262                registry
263                    .count_events_through(process_id, &event_type, event.sequence)
264                    .await?
265            }
266        };
267        if ordinal > 0 {
268            let key = scoped_effect_controller
269                .controller()
270                .await_event_key(
271                    &lash_core::ExecutionScope::process(process_id),
272                    lash_core::AwaitEventWaitIdentity::process_signal(
273                        process_id,
274                        &signal_name,
275                        ordinal,
276                    ),
277                )
278                .await
279                .map_err(|err| {
280                    EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
281                })?;
282            let _ = scoped_effect_controller
283                .controller()
284                .resolve_await_event(&key, lash_core::Resolution::Ok(payload))
285                .await
286                .map_err(|err| {
287                    EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
288                })?;
289        }
290        Ok(event)
291    }
292
293    pub async fn session_snapshot(
294        &self,
295        session_id: impl Into<String>,
296    ) -> Result<lash_core::ProcessWorkSnapshot> {
297        self.make_observer()?
298            .snapshot_for_session(session_id)
299            .await
300            .map_err(Into::into)
301    }
302
303    pub fn observer(&self) -> Result<lash_core::ProcessWorkObserver> {
304        self.make_observer()
305    }
306}
307
308#[derive(Clone)]
309pub struct SessionAdmin {
310    pub(crate) runtime: RuntimeHandle,
311}
312
313impl SessionAdmin {
314    pub fn config(&self) -> SessionConfigAdmin {
315        SessionConfigAdmin {
316            control: self.clone(),
317        }
318    }
319
320    pub fn tools(&self) -> ToolAdmin {
321        ToolAdmin {
322            control: self.clone(),
323        }
324    }
325
326    pub fn commands(&self) -> SessionCommandAdmin {
327        SessionCommandAdmin {
328            control: self.clone(),
329        }
330    }
331
332    pub fn triggers(&self) -> SessionTriggerAdmin {
333        SessionTriggerAdmin {
334            control: self.clone(),
335        }
336    }
337
338    pub fn state(&self) -> SessionStateAdmin {
339        SessionStateAdmin {
340            control: self.clone(),
341        }
342    }
343
344    pub fn children(&self) -> ChildSessionAdmin {
345        ChildSessionAdmin {
346            control: self.clone(),
347        }
348    }
349
350    pub fn injection(&self) -> InjectionAdmin {
351        InjectionAdmin {
352            control: self.clone(),
353        }
354    }
355
356    pub fn protocol(&self) -> ProtocolAdmin {
357        ProtocolAdmin {
358            control: self.clone(),
359        }
360    }
361
362    pub fn processes(&self) -> SessionProcessAdmin {
363        SessionProcessAdmin {
364            control: self.clone(),
365        }
366    }
367
368    /// Run `f` against the locked runtime writer, then publish the resulting
369    /// observation. The body is the canonical `lock → call → publish_from`
370    /// stamp shared by nearly every mutating control method; publish happens
371    /// unconditionally once the closure returns.
372    async fn with_writer<F, T>(&self, f: F) -> T
373    where
374        F: AsyncFnOnce(&mut LashRuntime) -> T,
375    {
376        let writer = self.runtime.writer();
377        let mut runtime = writer.lock().await;
378        let value = f(&mut runtime).await;
379        self.runtime.publish_from(&runtime);
380        value
381    }
382
383    async fn update_config(&self, patch: SessionConfigPatch) -> Result<()> {
384        self.update_session_config(patch.provider, patch.model, patch.prompt)
385            .await?;
386        Ok(())
387    }
388
389    async fn update_session_config(
390        &self,
391        provider: Option<ProviderHandle>,
392        model: Option<lash_core::ModelSpec>,
393        prompt: Option<PromptLayer>,
394    ) -> Result<()> {
395        self.with_writer(async |runtime: &mut LashRuntime| {
396            runtime.update_session_config(provider, model, prompt).await;
397        })
398        .await;
399        Ok(())
400    }
401
402    async fn export_state(&self) -> lash_core::SessionSnapshot {
403        self.runtime.observe().read_view.to_snapshot()
404    }
405
406    async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
407        self.with_writer(async |runtime: &mut LashRuntime| {
408            runtime
409                .append_session_nodes(lash_core::AppendSessionNodesRequest {
410                    nodes: messages
411                        .into_iter()
412                        .map(lash_core::SessionAppendNode::message)
413                        .collect(),
414                    requires_ancestor_node_id: None,
415                })
416                .await
417                .map(|_| ())
418                .map_err(Into::into)
419        })
420        .await
421    }
422
423    async fn append_plugin_body(
424        &self,
425        plugin_type: impl Into<String>,
426        body: serde_json::Value,
427    ) -> Result<()> {
428        self.with_writer(async |runtime: &mut LashRuntime| {
429            runtime
430                .append_session_nodes(lash_core::AppendSessionNodesRequest {
431                    nodes: vec![lash_core::SessionAppendNode::plugin(plugin_type, body)],
432                    requires_ancestor_node_id: None,
433                })
434                .await
435                .map(|_| ())
436                .map_err(Into::into)
437        })
438        .await
439    }
440
441    async fn set_persisted_state(&self, state: RuntimeSessionState) -> Result<()> {
442        self.with_writer(async |runtime: &mut LashRuntime| {
443            runtime.set_persisted_state(state).map_err(Into::into)
444        })
445        .await
446    }
447
448    async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
449        self.with_writer(async |runtime: &mut LashRuntime| {
450            runtime.set_prompt_template(template).await;
451        })
452        .await;
453        Ok(())
454    }
455
456    async fn clear_prompt_template(&self) -> Result<()> {
457        self.with_writer(async |runtime: &mut LashRuntime| {
458            runtime.clear_prompt_template().await;
459        })
460        .await;
461        Ok(())
462    }
463
464    async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
465        self.with_writer(async |runtime: &mut LashRuntime| {
466            runtime.add_prompt_contribution(contribution).await;
467        })
468        .await;
469        Ok(())
470    }
471
472    async fn replace_prompt_slot(
473        &self,
474        slot: PromptSlot,
475        contributions: impl IntoIterator<Item = PromptContribution>,
476    ) -> Result<()> {
477        self.with_writer(async |runtime: &mut LashRuntime| {
478            runtime.replace_prompt_slot(slot, contributions).await;
479        })
480        .await;
481        Ok(())
482    }
483
484    async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
485        self.with_writer(async |runtime: &mut LashRuntime| {
486            runtime.clear_prompt_slot(slot).await;
487        })
488        .await;
489        Ok(())
490    }
491
492    async fn apply_protocol_session_extension(
493        &self,
494        extension: lash_core::ProtocolSessionExtensionHandle,
495    ) -> Result<()> {
496        self.with_writer(async |runtime: &mut LashRuntime| {
497            runtime
498                .apply_protocol_session_extension(extension)
499                .await
500                .map_err(Into::into)
501        })
502        .await
503    }
504
505    async fn branch_to_node(
506        &self,
507        target_leaf: Option<String>,
508    ) -> Result<lash_core::SessionSnapshot> {
509        self.with_writer(async |runtime: &mut LashRuntime| {
510            runtime
511                .branch_to_node(target_leaf)
512                .await
513                .map_err(Into::into)
514        })
515        .await
516    }
517
518    async fn await_background_work(&self) -> Result<()> {
519        self.with_writer(async |runtime: &mut LashRuntime| {
520            runtime.await_background_work().await.map_err(Into::into)
521        })
522        .await
523    }
524
525    async fn refresh_tool_catalog(&self) -> Result<()> {
526        self.with_writer(async |runtime: &mut LashRuntime| {
527            runtime
528                .refresh_session_tool_catalog()
529                .await
530                .map_err(Into::into)
531        })
532        .await
533    }
534
535    async fn submit_session_command(
536        &self,
537        command: lash_core::SessionCommand,
538        idempotency_key: impl Into<String>,
539    ) -> Result<lash_core::SessionCommandReceipt> {
540        let idempotency_key = idempotency_key.into();
541        self.with_writer(async |runtime: &mut LashRuntime| {
542            runtime
543                .submit_session_command(command, idempotency_key)
544                .await
545                .map_err(Into::into)
546        })
547        .await
548    }
549
550    async fn list_trigger_registrations(&self) -> Result<Vec<lash_core::TriggerRegistration>> {
551        self.with_writer(async |runtime: &mut LashRuntime| {
552            runtime
553                .list_trigger_registrations()
554                .await
555                .map_err(Into::into)
556        })
557        .await
558    }
559
560    async fn trigger_registrations_by_source_type(
561        &self,
562        source_type: impl Into<lash_core::TriggerEventType>,
563    ) -> Result<Vec<lash_core::TriggerRegistration>> {
564        self.with_writer(async |runtime: &mut LashRuntime| {
565            runtime
566                .trigger_registrations_by_source_type(source_type)
567                .await
568                .map_err(Into::into)
569        })
570        .await
571    }
572
573    async fn query_plugin_raw(
574        &self,
575        name: &str,
576        args: serde_json::Value,
577    ) -> Result<(String, serde_json::Value)> {
578        let observation = self.runtime.observe();
579        let session_id = observation.session_id().to_string();
580        observation
581            .query_plugin(name, args, Some(session_id))
582            .await
583            .map_err(Into::into)
584    }
585
586    async fn run_plugin_command_raw(
587        &self,
588        name: &str,
589        args: serde_json::Value,
590    ) -> Result<lash_core::PluginCommandReceipt<serde_json::Value>> {
591        let session_id = self.runtime.observe().session_id().to_string();
592        let writer = self.runtime.writer();
593        let mut runtime = writer.lock().await;
594        let receipt = runtime
595            .run_plugin_command(name, args, Some(session_id))
596            .await?;
597        self.record_plugin_operation_observations(&receipt.events, &receipt.pending_turn_inputs);
598        self.runtime.publish_from(&runtime);
599        Ok(receipt)
600    }
601
602    async fn run_plugin_task_raw_with_cancel(
603        &self,
604        name: &str,
605        args: serde_json::Value,
606        cancellation_token: CancellationToken,
607    ) -> Result<lash_core::PluginTaskReceipt<serde_json::Value>> {
608        let session_id = self.runtime.observe().session_id().to_string();
609        let writer = self.runtime.writer();
610        let mut runtime = writer.lock().await;
611        let scope_id = format!(
612            "{session_id}:plugin_task:{name}:{}",
613            lash_core::TurnActivityId::fresh().0
614        );
615        let scoped_effect_controller = runtime
616            .effect_host()
617            .scoped_static(lash_core::ExecutionScope::runtime_operation(scope_id))
618            .map_err(EmbedError::Runtime)?
619            .ok_or_else(|| {
620                EmbedError::Plugin(lash_core::PluginError::Session(
621                    "plugin task execution requires an effect host that can create a static runtime-operation scope".to_string(),
622                ))
623            })?;
624        let receipt = runtime
625            .run_plugin_task(
626                name,
627                args,
628                Some(session_id),
629                scoped_effect_controller,
630                cancellation_token,
631            )
632            .await?;
633        self.record_plugin_operation_observations(&receipt.events, &receipt.pending_turn_inputs);
634        self.runtime.publish_from(&runtime);
635        Ok(receipt)
636    }
637
638    fn record_plugin_operation_observations(
639        &self,
640        events: &[lash_core::PluginOwned<lash_core::PluginRuntimeEvent>],
641        pending_turn_inputs: &[lash_core::PendingTurnInput],
642    ) {
643        for owned in events {
644            self.runtime
645                .record_turn_activity(lash_core::TurnActivity::independent(
646                    lash_core::TurnEvent::PluginRuntime {
647                        plugin_id: owned.plugin_id.clone(),
648                        event: owned.value.clone(),
649                    },
650                ));
651        }
652        if !pending_turn_inputs.is_empty() {
653            self.runtime.record_queue_changed(
654                lash_core::SessionQueueEventKind::Enqueued,
655                pending_turn_inputs
656                    .iter()
657                    .map(|input| input.input_id.clone())
658                    .collect(),
659            );
660        }
661    }
662
663    async fn compact_context(
664        &self,
665        instructions: Option<String>,
666        scoped_effect_controller: ScopedEffectController<'_>,
667    ) -> Result<bool> {
668        self.with_writer(async |runtime: &mut LashRuntime| {
669            runtime
670                .compact_context(instructions, scoped_effect_controller)
671                .await
672                .map_err(Into::into)
673        })
674        .await
675    }
676
677    async fn persist_current_state(&self) -> Result<RuntimeSessionState> {
678        self.with_writer(async |runtime: &mut LashRuntime| {
679            runtime.await_background_work().await?;
680            Ok(runtime.export_persisted_state())
681        })
682        .await
683    }
684
685    async fn list_process_handles(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
686        Ok(self.runtime.observe().list_process_handles().await)
687    }
688
689    async fn list_all_process_handles(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
690        Ok(self.runtime.observe().list_all_process_handles().await)
691    }
692
693    async fn start_process(
694        &self,
695        request: lash_core::ProcessStartRequest,
696        scoped_effect_controller: ScopedEffectController<'_>,
697    ) -> Result<lash_core::ProcessHandleSummary> {
698        let writer = self.runtime.writer();
699        let runtime = writer.lock().await;
700        let session_id = runtime.session_id().to_string();
701        let processes = runtime.process_service()?;
702        let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
703        let summary = processes
704            .start_from_request(&session_id, request, scope)
705            .await
706            .map_err(EmbedError::Plugin)?;
707        self.runtime.record_process_changed(
708            SessionProcessEventKind::Started,
709            vec![summary.process_id.clone()],
710        );
711        Ok(summary)
712    }
713
714    async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
715        self.runtime
716            .writer()
717            .lock()
718            .await
719            .session_state_service()
720            .map_err(Into::into)
721    }
722
723    async fn cancel_process(
724        &self,
725        process_id: &str,
726        scoped_effect_controller: ScopedEffectController<'_>,
727    ) -> Result<lash_core::ProcessCancelSummary> {
728        let writer = self.runtime.writer();
729        let runtime = writer.lock().await;
730        let session_id = runtime.session_id().to_string();
731        let processes = runtime.process_service()?;
732        let cancel_ability = runtime.process_cancel_ability();
733        let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
734        let summary = cancel_ability
735            .cancel_summary(
736                processes.as_ref(),
737                lash_core::ProcessCancelRequest::new(
738                    &session_id,
739                    process_id,
740                    scope,
741                    lash_core::ProcessCancelSource::HostApi,
742                )
743                .with_reason("requested by host API"),
744            )
745            .await
746            .map_err(EmbedError::Plugin)?;
747        self.runtime.record_process_changed(
748            SessionProcessEventKind::Cancelled,
749            vec![summary.process_id.clone()],
750        );
751        Ok(summary)
752    }
753
754    async fn cancel_visible_processes(
755        &self,
756        scoped_effect_controller: ScopedEffectController<'_>,
757    ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
758        let writer = self.runtime.writer();
759        let runtime = writer.lock().await;
760        let session_id = runtime.session_id().to_string();
761        let processes = runtime.process_service()?;
762        let cancel_ability = runtime.process_cancel_ability();
763        let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
764        let summaries = cancel_ability
765            .cancel_all_visible(
766                processes.as_ref(),
767                lash_core::ProcessCancelAllRequest::new(
768                    &session_id,
769                    scope,
770                    lash_core::ProcessCancelSource::HostApi,
771                )
772                .with_reason("requested by host API"),
773            )
774            .await
775            .map_err(EmbedError::Plugin)?;
776        self.runtime.record_process_changed(
777            SessionProcessEventKind::Cancelled,
778            summaries
779                .iter()
780                .map(|summary| summary.process_id.clone())
781                .collect(),
782        );
783        Ok(summaries)
784    }
785
786    async fn snapshot_execution_state(&self) -> Result<Option<Vec<u8>>> {
787        self.with_writer(async |runtime: &mut LashRuntime| {
788            runtime.snapshot_execution_state().await.map_err(Into::into)
789        })
790        .await
791    }
792
793    async fn restore_execution_state(&self, bytes: &[u8]) -> Result<()> {
794        self.with_writer(async |runtime: &mut LashRuntime| {
795            runtime
796                .restore_execution_state(bytes)
797                .await
798                .map_err(Into::into)
799        })
800        .await
801    }
802
803    async fn tool_state(&self) -> Result<ToolState> {
804        self.runtime.observe().tool_state.clone().ok_or_else(|| {
805            EmbedError::Session(SessionError::Protocol(
806                "runtime session not available".to_string(),
807            ))
808        })
809    }
810
811    async fn apply_tool_state(&self, state: ToolState) -> Result<u64> {
812        self.with_writer(async |runtime: &mut LashRuntime| {
813            runtime
814                .apply_tool_state(state)
815                .await
816                .map_err(EmbedError::from)
817        })
818        .await
819    }
820
821    async fn restore_tool_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
822        self.with_writer(async |runtime: &mut LashRuntime| {
823            runtime
824                .restore_tool_state(state)
825                .await
826                .map_err(EmbedError::from)
827        })
828        .await
829    }
830
831    async fn set_tool_membership(&self, tool_id: lash_core::ToolId, present: bool) -> Result<u64> {
832        self.set_tool_membership_many(&[(tool_id, present)]).await
833    }
834
835    async fn set_tool_membership_many(&self, updates: &[(lash_core::ToolId, bool)]) -> Result<u64> {
836        let mut state = self.tool_state().await?;
837        for (tool_id, present) in updates {
838            state
839                .set_membership(tool_id, *present)
840                .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
841        }
842        self.apply_tool_state(state).await
843    }
844
845    async fn active_tool_manifests(&self) -> Result<Vec<ToolManifest>> {
846        Ok(self.tool_state().await?.tool_manifests())
847    }
848
849    async fn add_tool_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
850        let tool_registry = self.tool_registry().await?;
851        let handle = tool_registry
852            .add_tool_provider(provider)
853            .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
854        self.refresh_tool_catalog().await?;
855        Ok(handle)
856    }
857
858    async fn remove_tool_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
859        let tool_registry = self.tool_registry().await?;
860        let generation = tool_registry
861            .remove_source(handle)
862            .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
863        self.refresh_tool_catalog().await?;
864        Ok(generation)
865    }
866
867    async fn create_child_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
868        let writer = self.runtime.writer();
869        let runtime = writer.lock().await;
870        let lifecycle = runtime.session_lifecycle_service()?;
871        lifecycle.create_session(request).await.map_err(Into::into)
872    }
873
874    async fn close_child_session(&self, session_id: &str) -> Result<()> {
875        let writer = self.runtime.writer();
876        let runtime = writer.lock().await;
877        let lifecycle = runtime.session_lifecycle_service()?;
878        lifecycle
879            .close_session(session_id)
880            .await
881            .map_err(Into::into)
882    }
883
884    async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
885        self.with_writer(async |runtime: &mut LashRuntime| {
886            runtime
887                .activate_managed_session(session_id)
888                .await
889                .map_err(Into::into)
890        })
891        .await
892    }
893
894    async fn inject_turn_input(
895        &self,
896        turn_id: &str,
897        id: Option<String>,
898        message: PluginMessage,
899    ) -> Result<()> {
900        self.inject_turn_inputs_for_turn(
901            turn_id,
902            vec![lash_core::InjectedTurnInput { id, message }],
903        )
904        .await
905    }
906
907    async fn inject_turn_inputs_for_turn(
908        &self,
909        turn_id: &str,
910        messages: Vec<lash_core::InjectedTurnInput>,
911    ) -> Result<()> {
912        for input in messages {
913            let source_key = input.id.map(|id| format!("injection:{id}"));
914            let turn_input = turn_input_from_plugin_message(input.message);
915            self.runtime
916                .enqueue_turn_input(
917                    turn_input,
918                    lash_core::TurnInputIngress::active_turn(
919                        turn_id,
920                        lash_core::TurnInputCheckpointBoundary::AfterWork,
921                    ),
922                    source_key,
923                )
924                .await
925                .map(|_| ())
926                .map_err(EmbedError::Runtime)?;
927        }
928        Ok(())
929    }
930
931    async fn tool_registry(&self) -> Result<Arc<lash_core::ToolRegistry>> {
932        self.runtime
933            .writer()
934            .lock()
935            .await
936            .plugin_session()
937            .map(|session| session.tool_registry())
938            .ok_or_else(|| {
939                EmbedError::Session(SessionError::Protocol(
940                    "tool registry is unavailable in this runtime session".to_string(),
941                ))
942            })
943    }
944}
945
946fn turn_input_from_plugin_message(message: PluginMessage) -> TurnInput {
947    let mut input = TurnInput::empty();
948    if !message.content.is_empty() {
949        input.items.push(InputItem::Text {
950            text: message.content,
951        });
952    }
953    for (index, bytes) in message.images.into_iter().enumerate() {
954        let id = format!("injected-image-{index}");
955        input.items.push(InputItem::ImageRef { id: id.clone() });
956        input.image_blobs.insert(id, bytes);
957    }
958    input
959}
960
961#[derive(Clone)]
962pub struct SessionConfigAdmin {
963    control: SessionAdmin,
964}
965
966impl SessionConfigAdmin {
967    pub async fn update(&self, patch: SessionConfigPatch) -> Result<()> {
968        self.control.update_config(patch).await
969    }
970
971    pub async fn update_session_config(
972        &self,
973        provider: Option<ProviderHandle>,
974        model: Option<lash_core::ModelSpec>,
975        prompt: Option<PromptLayer>,
976    ) -> Result<()> {
977        self.control
978            .update_session_config(provider, model, prompt)
979            .await
980    }
981
982    pub async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
983        self.control.set_prompt_template(template).await
984    }
985
986    pub async fn clear_prompt_template(&self) -> Result<()> {
987        self.control.clear_prompt_template().await
988    }
989
990    pub async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
991        self.control.add_prompt_contribution(contribution).await
992    }
993
994    pub async fn replace_prompt_slot(
995        &self,
996        slot: PromptSlot,
997        contributions: impl IntoIterator<Item = PromptContribution>,
998    ) -> Result<()> {
999        self.control.replace_prompt_slot(slot, contributions).await
1000    }
1001
1002    pub async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
1003        self.control.clear_prompt_slot(slot).await
1004    }
1005}
1006
1007#[derive(Clone)]
1008pub struct ToolAdmin {
1009    control: SessionAdmin,
1010}
1011
1012impl ToolAdmin {
1013    pub(crate) fn new(control: SessionAdmin) -> Self {
1014        Self { control }
1015    }
1016}
1017
1018impl ToolAdmin {
1019    pub async fn state(&self) -> Result<ToolState> {
1020        self.control.tool_state().await
1021    }
1022
1023    pub fn advanced(&self) -> AdvancedToolAdmin {
1024        AdvancedToolAdmin {
1025            control: self.control.clone(),
1026        }
1027    }
1028
1029    /// Toggle Tool Catalog membership for a tool. `present` adds it as a
1030    /// member; `!present` removes it. Membership is the execution gate.
1031    pub async fn set_membership(
1032        &self,
1033        tool_id: impl Into<lash_core::ToolId>,
1034        present: bool,
1035    ) -> Result<u64> {
1036        self.control
1037            .set_tool_membership(tool_id.into(), present)
1038            .await
1039    }
1040
1041    pub async fn set_membership_many(&self, updates: &[(lash_core::ToolId, bool)]) -> Result<u64> {
1042        self.control.set_tool_membership_many(updates).await
1043    }
1044
1045    pub async fn active_manifests(&self) -> Result<Vec<ToolManifest>> {
1046        self.control.active_tool_manifests().await
1047    }
1048
1049    pub async fn add_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
1050        self.control.add_tool_provider(provider).await
1051    }
1052
1053    pub async fn remove_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
1054        self.control.remove_tool_source(handle).await
1055    }
1056}
1057
1058#[derive(Clone)]
1059pub struct AdvancedToolAdmin {
1060    control: SessionAdmin,
1061}
1062
1063impl AdvancedToolAdmin {
1064    /// Replace the entire tool-state snapshot.
1065    ///
1066    /// This is a generation-checked escape hatch for hosts that intentionally
1067    /// edit the full snapshot. Prefer `ToolAdmin` membership methods for
1068    /// ordinary tool policy changes.
1069    pub async fn apply_state(&self, state: ToolState) -> Result<u64> {
1070        self.control.apply_tool_state(state).await
1071    }
1072
1073    /// Restore a persisted tool-state snapshot, adopting its generation.
1074    ///
1075    /// Use this when re-applying a snapshot read from durable storage (session
1076    /// resume), not an edited delta: it reconstructs the exact persisted surface
1077    /// idempotently rather than requiring the snapshot to match the current
1078    /// generation. A cold resume of a session whose surface reached generation
1079    /// ≥ 2 needs this — [`apply_state`](Self::apply_state) would reject it.
1080    ///
1081    /// Persisted tools whose source is not currently registered (e.g. a
1082    /// detached MCP server) do not fail the restore: they are kept as orphaned
1083    /// non-members, listed in the returned [`ToolRestoreReport`], and rebind
1084    /// automatically when a source re-advertises the same tool.
1085    pub async fn restore_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
1086        self.control.restore_tool_state(state).await
1087    }
1088}
1089
1090#[derive(Clone)]
1091pub struct SessionCommandAdmin {
1092    control: SessionAdmin,
1093}
1094
1095impl SessionCommandAdmin {
1096    /// Enqueue an unconditional tool-catalog refresh. The command drains
1097    /// asynchronously and recomputes the surface from live sources, so it
1098    /// takes no generation guard — any generation observed at enqueue time
1099    /// could legitimately have advanced by drain time.
1100    pub async fn refresh_tool_catalog(
1101        &self,
1102        reason: impl Into<String>,
1103        idempotency_key: impl Into<String>,
1104    ) -> Result<lash_core::SessionCommandReceipt> {
1105        self.control
1106            .submit_session_command(
1107                lash_core::SessionCommand::RefreshToolCatalog {
1108                    reason: reason.into(),
1109                },
1110                idempotency_key,
1111            )
1112            .await
1113    }
1114
1115    pub async fn reset(
1116        &self,
1117        reason: impl Into<String>,
1118        idempotency_key: impl Into<String>,
1119    ) -> Result<lash_core::SessionCommandReceipt> {
1120        self.control
1121            .submit_session_command(
1122                lash_core::SessionCommand::ResetSession {
1123                    reason: reason.into(),
1124                },
1125                idempotency_key,
1126            )
1127            .await
1128    }
1129}
1130
1131/// Session-scoped read controls for Lashlang trigger registrations.
1132#[derive(Clone)]
1133pub struct SessionTriggerAdmin {
1134    control: SessionAdmin,
1135}
1136
1137impl SessionTriggerAdmin {
1138    /// Return every trigger registration in the session.
1139    ///
1140    /// This is an admin/introspection view. Source owners should prefer
1141    /// [`Self::by_source_type`] so they only inspect registrations for the
1142    /// concrete source type they own.
1143    pub async fn list_all(&self) -> Result<Vec<lash_core::TriggerRegistration>> {
1144        self.control.list_trigger_registrations().await
1145    }
1146
1147    /// Return registrations whose source value has the given host descriptor type.
1148    ///
1149    /// This is the source-owner API: a timer, UI, webhook, or other host-owned
1150    /// source uses it to inspect registrations for keys it may schedule and emit.
1151    pub async fn by_source_type(
1152        &self,
1153        source_type: impl Into<lash_core::TriggerEventType>,
1154    ) -> Result<Vec<lash_core::TriggerRegistration>> {
1155        self.control
1156            .trigger_registrations_by_source_type(source_type)
1157            .await
1158    }
1159}
1160
1161#[derive(Clone)]
1162pub struct SessionProcessAdmin {
1163    control: SessionAdmin,
1164}
1165
1166impl SessionProcessAdmin {
1167    pub(crate) fn new(control: SessionAdmin) -> Self {
1168        Self { control }
1169    }
1170
1171    pub async fn start(
1172        &self,
1173        request: lash_core::ProcessStartRequest,
1174        scoped_effect_controller: ScopedEffectController<'_>,
1175    ) -> Result<lash_core::ProcessHandleSummary> {
1176        self.control
1177            .start_process(request, scoped_effect_controller)
1178            .await
1179    }
1180
1181    pub async fn list(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
1182        self.control.list_process_handles().await
1183    }
1184
1185    pub async fn list_all(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
1186        self.control.list_all_process_handles().await
1187    }
1188
1189    pub async fn await_all(&self) -> Result<()> {
1190        self.control.await_background_work().await
1191    }
1192
1193    pub async fn cancel(
1194        &self,
1195        process_id: &str,
1196        scoped_effect_controller: ScopedEffectController<'_>,
1197    ) -> Result<lash_core::ProcessCancelSummary> {
1198        self.control
1199            .cancel_process(process_id, scoped_effect_controller)
1200            .await
1201    }
1202
1203    pub async fn cancel_all(
1204        &self,
1205        scoped_effect_controller: ScopedEffectController<'_>,
1206    ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
1207        self.control
1208            .cancel_visible_processes(scoped_effect_controller)
1209            .await
1210    }
1211}
1212
1213#[derive(Clone)]
1214pub struct SessionStateAdmin {
1215    control: SessionAdmin,
1216}
1217
1218impl SessionStateAdmin {
1219    pub async fn export(&self) -> lash_core::SessionSnapshot {
1220        self.control.export_state().await
1221    }
1222
1223    pub async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
1224        self.control.append_messages(messages).await
1225    }
1226
1227    pub async fn append_plugin_body(
1228        &self,
1229        plugin_type: impl Into<String>,
1230        body: serde_json::Value,
1231    ) -> Result<()> {
1232        self.control.append_plugin_body(plugin_type, body).await
1233    }
1234
1235    pub async fn set_persisted(&self, state: RuntimeSessionState) -> Result<()> {
1236        self.control.set_persisted_state(state).await
1237    }
1238
1239    pub async fn branch_to_node(
1240        &self,
1241        target_leaf: Option<String>,
1242    ) -> Result<lash_core::SessionSnapshot> {
1243        self.control.branch_to_node(target_leaf).await
1244    }
1245
1246    pub async fn persist_current(&self) -> Result<RuntimeSessionState> {
1247        self.control.persist_current_state().await
1248    }
1249
1250    pub async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
1251        self.control.session_state_service().await
1252    }
1253
1254    pub async fn snapshot_execution(&self) -> Result<Option<Vec<u8>>> {
1255        self.control.snapshot_execution_state().await
1256    }
1257
1258    pub async fn restore_execution(&self, bytes: &[u8]) -> Result<()> {
1259        self.control.restore_execution_state(bytes).await
1260    }
1261
1262    pub async fn compact_context(
1263        &self,
1264        instructions: Option<String>,
1265        scoped_effect_controller: ScopedEffectController<'_>,
1266    ) -> Result<bool> {
1267        self.control
1268            .compact_context(instructions, scoped_effect_controller)
1269            .await
1270    }
1271}
1272
1273#[derive(Clone)]
1274pub struct PluginOperations {
1275    pub(crate) control: SessionAdmin,
1276}
1277
1278impl PluginOperations {
1279    pub async fn query<Op: lash_core::PluginQuery>(&self, args: Op::Args) -> Result<Op::Output> {
1280        let (_plugin_id, output) = self
1281            .control
1282            .query_plugin_raw(Op::NAME, encode_plugin_args::<Op>(args)?)
1283            .await?;
1284        decode_plugin_output::<Op>(output)
1285    }
1286
1287    pub async fn query_raw(
1288        &self,
1289        name: &str,
1290        args: serde_json::Value,
1291    ) -> Result<(String, serde_json::Value)> {
1292        self.control.query_plugin_raw(name, args).await
1293    }
1294
1295    pub async fn run_command<Op: lash_core::PluginCommand>(
1296        &self,
1297        args: Op::Args,
1298    ) -> Result<lash_core::PluginCommandReceipt<Op::Output>> {
1299        let receipt = self
1300            .control
1301            .run_plugin_command_raw(Op::NAME, encode_plugin_args::<Op>(args)?)
1302            .await?;
1303        Ok(lash_core::PluginCommandReceipt {
1304            output: decode_plugin_output::<Op>(receipt.output)?,
1305            events: receipt.events,
1306            pending_turn_inputs: receipt.pending_turn_inputs,
1307        })
1308    }
1309
1310    pub async fn run_command_raw(
1311        &self,
1312        name: &str,
1313        args: serde_json::Value,
1314    ) -> Result<lash_core::PluginCommandReceipt<serde_json::Value>> {
1315        self.control.run_plugin_command_raw(name, args).await
1316    }
1317
1318    pub async fn run_task<Op: lash_core::PluginTask>(
1319        &self,
1320        args: Op::Args,
1321    ) -> Result<lash_core::PluginTaskReceipt<Op::Output>> {
1322        self.run_task_with_cancel::<Op>(args, CancellationToken::new())
1323            .await
1324    }
1325
1326    pub async fn run_task_with_cancel<Op: lash_core::PluginTask>(
1327        &self,
1328        args: Op::Args,
1329        cancellation_token: CancellationToken,
1330    ) -> Result<lash_core::PluginTaskReceipt<Op::Output>> {
1331        let receipt = self
1332            .control
1333            .run_plugin_task_raw_with_cancel(
1334                Op::NAME,
1335                encode_plugin_args::<Op>(args)?,
1336                cancellation_token,
1337            )
1338            .await?;
1339        Ok(lash_core::PluginTaskReceipt {
1340            output: decode_plugin_output::<Op>(receipt.output)?,
1341            events: receipt.events,
1342            pending_turn_inputs: receipt.pending_turn_inputs,
1343        })
1344    }
1345
1346    pub async fn run_task_raw(
1347        &self,
1348        name: &str,
1349        args: serde_json::Value,
1350    ) -> Result<lash_core::PluginTaskReceipt<serde_json::Value>> {
1351        self.run_task_raw_with_cancel(name, args, CancellationToken::new())
1352            .await
1353    }
1354
1355    pub async fn run_task_raw_with_cancel(
1356        &self,
1357        name: &str,
1358        args: serde_json::Value,
1359        cancellation_token: CancellationToken,
1360    ) -> Result<lash_core::PluginTaskReceipt<serde_json::Value>> {
1361        self.control
1362            .run_plugin_task_raw_with_cancel(name, args, cancellation_token)
1363            .await
1364    }
1365}
1366
1367fn encode_plugin_args<Op: lash_core::PluginOperation>(args: Op::Args) -> Result<serde_json::Value> {
1368    serde_json::to_value(args).map_err(|err| {
1369        EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
1370            "invalid {} args: {err}",
1371            Op::NAME
1372        )))
1373    })
1374}
1375
1376fn decode_plugin_output<Op: lash_core::PluginOperation>(
1377    output: serde_json::Value,
1378) -> Result<Op::Output> {
1379    serde_json::from_value(output).map_err(|err| {
1380        EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
1381            "invalid {} output: {err}",
1382            Op::NAME
1383        )))
1384    })
1385}
1386
1387#[derive(Clone)]
1388pub struct ChildSessionAdmin {
1389    control: SessionAdmin,
1390}
1391
1392impl ChildSessionAdmin {
1393    pub async fn create_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
1394        self.control.create_child_session(request).await
1395    }
1396
1397    pub async fn close_session(&self, session_id: &str) -> Result<()> {
1398        self.control.close_child_session(session_id).await
1399    }
1400
1401    pub async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
1402        self.control.activate_managed_session(session_id).await
1403    }
1404}
1405
1406#[derive(Clone)]
1407pub struct InjectionAdmin {
1408    control: SessionAdmin,
1409}
1410
1411impl InjectionAdmin {
1412    pub async fn inject_turn_input(
1413        &self,
1414        turn_id: &str,
1415        id: Option<String>,
1416        message: PluginMessage,
1417    ) -> Result<()> {
1418        self.control.inject_turn_input(turn_id, id, message).await
1419    }
1420
1421    pub async fn inject_turn_inputs_for_turn(
1422        &self,
1423        turn_id: &str,
1424        messages: Vec<lash_core::InjectedTurnInput>,
1425    ) -> Result<()> {
1426        self.control
1427            .inject_turn_inputs_for_turn(turn_id, messages)
1428            .await
1429    }
1430}
1431
1432#[derive(Clone)]
1433pub struct ProtocolAdmin {
1434    control: SessionAdmin,
1435}
1436
1437impl ProtocolAdmin {
1438    pub async fn apply_session_extension(
1439        &self,
1440        extension: lash_core::ProtocolSessionExtensionHandle,
1441    ) -> Result<()> {
1442        self.control
1443            .apply_protocol_session_extension(extension)
1444            .await
1445    }
1446}