Skip to main content

lash/
admin.rs

1pub use crate::session::SessionConfigPatch;
2use crate::support::*;
3pub use lash_core::{AcceptedInjectedTurnInput, PluginCommand, PluginQuery, PluginTask};
4
5#[derive(Clone)]
6pub struct Completions {
7    pub(crate) core: LashCore,
8}
9
10impl Completions {
11    pub async fn resolve(
12        &self,
13        key: lash_core::AwaitEventKey,
14        resolution: lash_core::Resolution,
15    ) -> Result<lash_core::ResolveOutcome> {
16        self.core
17            .env
18            .core
19            .control
20            .effect_host
21            .resolve_await_event(&key, resolution)
22            .await
23            .map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))
24    }
25}
26
27#[derive(Clone)]
28pub struct CoreTriggerAdmin {
29    pub(crate) core: LashCore,
30}
31
32impl CoreTriggerAdmin {
33    pub async fn emit(
34        &self,
35        request: lash_core::TriggerOccurrenceRequest,
36        scoped_effect_controller: ScopedEffectController<'_>,
37    ) -> Result<lash_core::TriggerEmitReport> {
38        let store = self.core.env.trigger_store.as_ref().ok_or_else(|| {
39            EmbedError::Plugin(lash_core::PluginError::Session(
40                "trigger store is unavailable in this runtime".to_string(),
41            ))
42        })?;
43        let drivers = self.core.work_driver.drivers().await;
44        let router = lash_core::TriggerRouter::new(
45            Arc::clone(store),
46            self.core.env.process_registry.clone(),
47            drivers.process,
48        );
49        router
50            .emit(request, scoped_effect_controller.controller())
51            .await
52            .map_err(Into::into)
53    }
54
55    pub async fn subscriptions(
56        &self,
57        filter: lash_core::TriggerSubscriptionFilter,
58    ) -> Result<Vec<lash_core::TriggerRegistration>> {
59        let store = self.core.env.trigger_store.as_ref().ok_or_else(|| {
60            EmbedError::Plugin(lash_core::PluginError::Session(
61                "trigger store is unavailable in this runtime".to_string(),
62            ))
63        })?;
64        let records = store.list_subscriptions(filter).await?;
65        Ok(records
66            .iter()
67            .map(lash_core::TriggerRegistration::from)
68            .collect())
69    }
70}
71
72#[derive(Clone)]
73pub struct Processes {
74    pub(crate) core: LashCore,
75}
76
77impl Processes {
78    fn registry(&self) -> Result<Arc<dyn lash_core::ProcessRegistry>> {
79        self.core
80            .env
81            .process_registry
82            .as_ref()
83            .cloned()
84            .ok_or_else(|| {
85                EmbedError::Plugin(lash_core::PluginError::Session(
86                    "process registry is unavailable in this runtime".to_string(),
87                ))
88            })
89    }
90
91    fn make_observer(&self) -> Result<lash_core::ProcessWorkObserver> {
92        Ok(lash_core::ProcessWorkObserver::new(self.registry()?))
93    }
94
95    fn process_invocation(command: &lash_core::ProcessCommand) -> lash_core::RuntimeInvocation {
96        let effect_id = command.effect_id();
97        lash_core::RuntimeInvocation::effect(
98            lash_core::runtime::RuntimeScope::new("runtime"),
99            effect_id.clone(),
100            lash_core::RuntimeEffectKind::Process,
101            effect_id,
102        )
103    }
104
105    async fn run_command(
106        &self,
107        command: lash_core::ProcessCommand,
108        scoped_effect_controller: ScopedEffectController<'_>,
109    ) -> Result<lash_core::ProcessEffectOutcome> {
110        let registry = self.registry()?;
111        let invocation = Self::process_invocation(&command);
112        let outcome = scoped_effect_controller
113            .controller()
114            .execute_effect(
115                lash_core::RuntimeEffectEnvelope::new(
116                    invocation,
117                    lash_core::RuntimeEffectCommand::process(command),
118                ),
119                lash_core::RuntimeEffectLocalExecutor::processes(registry),
120            )
121            .await
122            .map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))?;
123        match outcome {
124            lash_core::RuntimeEffectOutcome::Process { result } => Ok(result),
125            _ => Err(EmbedError::Plugin(lash_core::PluginError::Session(
126                "process effect returned non-process outcome".to_string(),
127            ))),
128        }
129    }
130
131    pub async fn start(
132        &self,
133        request: lash_core::ProcessStartRequest,
134        scoped_effect_controller: ScopedEffectController<'_>,
135    ) -> Result<lash_core::ProcessRecord> {
136        let env_ref = match request.env_spec.as_ref() {
137            Some(env_spec) => Some(
138                lash_core::runtime::persist_process_execution_env(
139                    self.core.env.core.durability.process_env_store.as_ref(),
140                    env_spec,
141                )
142                .await?,
143            ),
144            None => None,
145        };
146        let grant = request.grant.clone();
147        let registration = request.into_registration(env_ref);
148        let command = lash_core::ProcessCommand::Start {
149            registration,
150            grant,
151            execution_context: Box::new(lash_core::ProcessExecutionContext::default()),
152        };
153        let outcome = self
154            .run_command(command, scoped_effect_controller.clone())
155            .await?;
156        let lash_core::ProcessEffectOutcome::Start { record } = outcome else {
157            return Err(EmbedError::Plugin(lash_core::PluginError::Session(
158                "process start returned the wrong outcome".to_string(),
159            )));
160        };
161        if let Some(driver) = self.core.work_driver.drivers().await.process {
162            driver.claim_and_run_pending("admin_process_start").await?;
163        }
164        Ok(record)
165    }
166
167    pub async fn list(
168        &self,
169        filter: &lash_core::ProcessListFilter,
170    ) -> Result<Vec<lash_core::ObservedProcess>> {
171        self.make_observer()?.list(filter).await.map_err(Into::into)
172    }
173
174    pub async fn get(&self, process_id: &str) -> Result<Option<lash_core::ObservedProcess>> {
175        Ok(self.make_observer()?.process(process_id).await)
176    }
177
178    pub async fn events(
179        &self,
180        process_id: &str,
181        after_sequence: u64,
182    ) -> Result<Vec<lash_core::ObservedProcessEvent>> {
183        self.make_observer()?
184            .events_after(process_id, after_sequence)
185            .await
186            .map_err(Into::into)
187    }
188
189    pub async fn await_output(&self, process_id: &str) -> Result<lash_core::ProcessAwaitOutput> {
190        self.registry()?
191            .await_process(process_id)
192            .await
193            .map_err(Into::into)
194    }
195
196    pub async fn cancel(
197        &self,
198        process_id: &str,
199        scoped_effect_controller: ScopedEffectController<'_>,
200    ) -> Result<lash_core::ProcessCancelSummary> {
201        let command = lash_core::ProcessCommand::Cancel {
202            process_id: process_id.to_string(),
203            reason: Some("requested by host".to_string()),
204        };
205        let outcome = self
206            .run_command(command, scoped_effect_controller.clone())
207            .await?;
208        let lash_core::ProcessEffectOutcome::Cancel { record } = outcome else {
209            return Err(EmbedError::Plugin(lash_core::PluginError::Session(
210                "process cancel returned the wrong outcome".to_string(),
211            )));
212        };
213        Ok(lash_core::ProcessCancelSummary::from_record(record))
214    }
215
216    pub async fn signal(
217        &self,
218        process_id: &str,
219        signal_name: impl Into<String>,
220        signal_id: impl Into<String>,
221        request: lash_core::ProcessEventAppendRequest,
222        scoped_effect_controller: ScopedEffectController<'_>,
223    ) -> Result<lash_core::ProcessEvent> {
224        let signal_name = signal_name.into();
225        let event_type = request.event_type.clone();
226        let payload = request.payload.clone();
227        let command = lash_core::ProcessCommand::Signal {
228            process_id: process_id.to_string(),
229            signal_name: signal_name.clone(),
230            signal_id: signal_id.into(),
231            request,
232        };
233        let outcome = self
234            .run_command(command, scoped_effect_controller.clone())
235            .await?;
236        let lash_core::ProcessEffectOutcome::Signal { event } = outcome else {
237            return Err(EmbedError::Plugin(lash_core::PluginError::Session(
238                "process signal returned the wrong outcome".to_string(),
239            )));
240        };
241        let registry = self.registry()?;
242        let waiting_ordinal =
243            registry
244                .get_process(process_id)
245                .await
246                .and_then(|record| match record.wait {
247                    Some(lash_core::WaitState {
248                        kind:
249                            lash_core::WaitKind::Signal {
250                                name,
251                                event_type: wait_event_type,
252                                ordinal,
253                                ..
254                            },
255                        ..
256                    }) if name == signal_name && wait_event_type == event_type => Some(ordinal),
257                    _ => None,
258                });
259        let ordinal = match waiting_ordinal {
260            Some(ordinal) => ordinal,
261            None => {
262                registry
263                    .count_events_through(process_id, &event_type, event.sequence)
264                    .await?
265            }
266        };
267        if ordinal > 0 {
268            let key = scoped_effect_controller
269                .controller()
270                .await_event_key(
271                    &lash_core::ExecutionScope::process(process_id),
272                    lash_core::AwaitEventWaitIdentity::process_signal(
273                        process_id,
274                        &signal_name,
275                        ordinal,
276                    ),
277                )
278                .await
279                .map_err(|err| {
280                    EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
281                })?;
282            let _ = scoped_effect_controller
283                .controller()
284                .resolve_await_event(&key, lash_core::Resolution::Ok(payload))
285                .await
286                .map_err(|err| {
287                    EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
288                })?;
289        }
290        Ok(event)
291    }
292
293    pub async fn session_snapshot(
294        &self,
295        session_id: impl Into<String>,
296    ) -> Result<lash_core::ProcessWorkSnapshot> {
297        self.make_observer()?
298            .snapshot_for_session(session_id)
299            .await
300            .map_err(Into::into)
301    }
302
303    pub fn observer(&self) -> Result<lash_core::ProcessWorkObserver> {
304        self.make_observer()
305    }
306}
307
308#[derive(Clone)]
309pub struct SessionAdmin {
310    pub(crate) runtime: RuntimeHandle,
311}
312
313impl SessionAdmin {
314    pub fn config(&self) -> SessionConfigAdmin {
315        SessionConfigAdmin {
316            control: self.clone(),
317        }
318    }
319
320    pub fn tools(&self) -> ToolAdmin {
321        ToolAdmin {
322            control: self.clone(),
323        }
324    }
325
326    pub fn commands(&self) -> SessionCommandAdmin {
327        SessionCommandAdmin {
328            control: self.clone(),
329        }
330    }
331
332    pub fn triggers(&self) -> SessionTriggerAdmin {
333        SessionTriggerAdmin {
334            control: self.clone(),
335        }
336    }
337
338    pub fn state(&self) -> SessionStateAdmin {
339        SessionStateAdmin {
340            control: self.clone(),
341        }
342    }
343
344    pub fn children(&self) -> ChildSessionAdmin {
345        ChildSessionAdmin {
346            control: self.clone(),
347        }
348    }
349
350    pub fn injection(&self) -> InjectionAdmin {
351        InjectionAdmin {
352            control: self.clone(),
353        }
354    }
355
356    pub fn protocol(&self) -> ProtocolAdmin {
357        ProtocolAdmin {
358            control: self.clone(),
359        }
360    }
361
362    pub fn processes(&self) -> SessionProcessAdmin {
363        SessionProcessAdmin {
364            control: self.clone(),
365        }
366    }
367
368    /// Run `f` against the locked runtime writer, then publish the resulting
369    /// observation. The body is the canonical `lock → call → publish_from`
370    /// stamp shared by nearly every mutating control method; publish happens
371    /// unconditionally once the closure returns.
372    async fn with_writer<F, T>(&self, f: F) -> T
373    where
374        F: AsyncFnOnce(&mut LashRuntime) -> T,
375    {
376        let writer = self.runtime.writer();
377        let mut runtime = writer.lock().await;
378        let value = f(&mut runtime).await;
379        self.runtime.publish_from(&runtime);
380        value
381    }
382
383    async fn update_config(&self, patch: SessionConfigPatch) -> Result<()> {
384        self.update_session_config(patch.provider, patch.model, patch.prompt)
385            .await?;
386        Ok(())
387    }
388
389    async fn update_session_config(
390        &self,
391        provider: Option<ProviderHandle>,
392        model: Option<lash_core::ModelSpec>,
393        prompt: Option<PromptLayer>,
394    ) -> Result<()> {
395        self.with_writer(async |runtime: &mut LashRuntime| {
396            runtime.update_session_config(provider, model, prompt).await;
397        })
398        .await;
399        Ok(())
400    }
401
402    async fn export_state(&self) -> lash_core::SessionSnapshot {
403        self.runtime.observe().read_view.to_snapshot()
404    }
405
406    async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
407        self.with_writer(async |runtime: &mut LashRuntime| {
408            runtime
409                .append_session_nodes(lash_core::AppendSessionNodesRequest {
410                    nodes: messages
411                        .into_iter()
412                        .map(lash_core::SessionAppendNode::message)
413                        .collect(),
414                    requires_ancestor_node_id: None,
415                })
416                .await
417                .map(|_| ())
418                .map_err(Into::into)
419        })
420        .await
421    }
422
423    async fn append_plugin_body(
424        &self,
425        plugin_type: impl Into<String>,
426        body: serde_json::Value,
427    ) -> Result<()> {
428        self.with_writer(async |runtime: &mut LashRuntime| {
429            runtime
430                .append_session_nodes(lash_core::AppendSessionNodesRequest {
431                    nodes: vec![lash_core::SessionAppendNode::plugin(plugin_type, body)],
432                    requires_ancestor_node_id: None,
433                })
434                .await
435                .map(|_| ())
436                .map_err(Into::into)
437        })
438        .await
439    }
440
441    async fn set_persisted_state(&self, state: RuntimeSessionState) -> Result<()> {
442        self.with_writer(async |runtime: &mut LashRuntime| {
443            runtime.set_persisted_state(state).map_err(Into::into)
444        })
445        .await
446    }
447
448    async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
449        self.with_writer(async |runtime: &mut LashRuntime| {
450            runtime.set_prompt_template(template).await;
451        })
452        .await;
453        Ok(())
454    }
455
456    async fn clear_prompt_template(&self) -> Result<()> {
457        self.with_writer(async |runtime: &mut LashRuntime| {
458            runtime.clear_prompt_template().await;
459        })
460        .await;
461        Ok(())
462    }
463
464    async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
465        self.with_writer(async |runtime: &mut LashRuntime| {
466            runtime.add_prompt_contribution(contribution).await;
467        })
468        .await;
469        Ok(())
470    }
471
472    async fn replace_prompt_slot(
473        &self,
474        slot: PromptSlot,
475        contributions: impl IntoIterator<Item = PromptContribution>,
476    ) -> Result<()> {
477        self.with_writer(async |runtime: &mut LashRuntime| {
478            runtime.replace_prompt_slot(slot, contributions).await;
479        })
480        .await;
481        Ok(())
482    }
483
484    async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
485        self.with_writer(async |runtime: &mut LashRuntime| {
486            runtime.clear_prompt_slot(slot).await;
487        })
488        .await;
489        Ok(())
490    }
491
492    async fn apply_protocol_session_extension(
493        &self,
494        extension: lash_core::ProtocolSessionExtensionHandle,
495    ) -> Result<()> {
496        self.with_writer(async |runtime: &mut LashRuntime| {
497            runtime
498                .apply_protocol_session_extension(extension)
499                .await
500                .map_err(Into::into)
501        })
502        .await
503    }
504
505    async fn branch_to_node(
506        &self,
507        target_leaf: Option<String>,
508    ) -> Result<lash_core::SessionSnapshot> {
509        self.with_writer(async |runtime: &mut LashRuntime| {
510            runtime
511                .branch_to_node(target_leaf)
512                .await
513                .map_err(Into::into)
514        })
515        .await
516    }
517
518    async fn await_background_work(&self) -> Result<()> {
519        self.with_writer(async |runtime: &mut LashRuntime| {
520            runtime.await_background_work().await.map_err(Into::into)
521        })
522        .await
523    }
524
525    async fn refresh_tool_catalog(&self) -> Result<()> {
526        self.with_writer(async |runtime: &mut LashRuntime| {
527            runtime
528                .refresh_session_tool_catalog()
529                .await
530                .map_err(Into::into)
531        })
532        .await
533    }
534
535    async fn submit_session_command(
536        &self,
537        command: lash_core::SessionCommand,
538        idempotency_key: impl Into<String>,
539    ) -> Result<lash_core::SessionCommandReceipt> {
540        let idempotency_key = idempotency_key.into();
541        self.with_writer(async |runtime: &mut LashRuntime| {
542            runtime
543                .submit_session_command(command, idempotency_key)
544                .await
545                .map_err(Into::into)
546        })
547        .await
548    }
549
550    async fn list_trigger_registrations(&self) -> Result<Vec<lash_core::TriggerRegistration>> {
551        self.with_writer(async |runtime: &mut LashRuntime| {
552            runtime
553                .list_trigger_registrations()
554                .await
555                .map_err(Into::into)
556        })
557        .await
558    }
559
560    async fn trigger_registrations_by_source_type(
561        &self,
562        source_type: impl Into<lash_core::TriggerEventType>,
563    ) -> Result<Vec<lash_core::TriggerRegistration>> {
564        self.with_writer(async |runtime: &mut LashRuntime| {
565            runtime
566                .trigger_registrations_by_source_type(source_type)
567                .await
568                .map_err(Into::into)
569        })
570        .await
571    }
572
573    async fn query_plugin_raw(
574        &self,
575        name: &str,
576        args: serde_json::Value,
577    ) -> Result<(String, serde_json::Value)> {
578        let observation = self.runtime.observe();
579        let session_id = observation.session_id().to_string();
580        observation
581            .query_plugin(name, args, Some(session_id))
582            .await
583            .map_err(Into::into)
584    }
585
586    async fn run_plugin_command_raw(
587        &self,
588        name: &str,
589        args: serde_json::Value,
590    ) -> Result<lash_core::PluginCommandReceipt<serde_json::Value>> {
591        let session_id = self.runtime.observe().session_id().to_string();
592        let writer = self.runtime.writer();
593        let mut runtime = writer.lock().await;
594        let receipt = runtime
595            .run_plugin_command(name, args, Some(session_id))
596            .await?;
597        self.record_plugin_operation_observations(&receipt.events, &receipt.queued_batches);
598        self.runtime.publish_from(&runtime);
599        Ok(receipt)
600    }
601
602    async fn run_plugin_task_raw_with_cancel(
603        &self,
604        name: &str,
605        args: serde_json::Value,
606        cancellation_token: CancellationToken,
607    ) -> Result<lash_core::PluginTaskReceipt<serde_json::Value>> {
608        let session_id = self.runtime.observe().session_id().to_string();
609        let writer = self.runtime.writer();
610        let mut runtime = writer.lock().await;
611        let scope_id = format!(
612            "{session_id}:plugin_task:{name}:{}",
613            lash_core::TurnActivityId::fresh().0
614        );
615        let scoped_effect_controller = runtime
616            .effect_host()
617            .scoped_static(lash_core::ExecutionScope::runtime_operation(scope_id))
618            .map_err(EmbedError::Runtime)?
619            .ok_or_else(|| {
620                EmbedError::Plugin(lash_core::PluginError::Session(
621                    "plugin task execution requires an effect host that can create a static runtime-operation scope".to_string(),
622                ))
623            })?;
624        let receipt = runtime
625            .run_plugin_task(
626                name,
627                args,
628                Some(session_id),
629                scoped_effect_controller,
630                cancellation_token,
631            )
632            .await?;
633        self.record_plugin_operation_observations(&receipt.events, &receipt.queued_batches);
634        self.runtime.publish_from(&runtime);
635        Ok(receipt)
636    }
637
638    fn record_plugin_operation_observations(
639        &self,
640        events: &[lash_core::PluginOwned<lash_core::PluginRuntimeEvent>],
641        queued_batches: &[lash_core::runtime::QueuedWorkBatch],
642    ) {
643        for owned in events {
644            self.runtime
645                .record_turn_activity(lash_core::TurnActivity::independent(
646                    lash_core::TurnEvent::PluginRuntime {
647                        plugin_id: owned.plugin_id.clone(),
648                        event: owned.value.clone(),
649                    },
650                ));
651        }
652        if !queued_batches.is_empty() {
653            self.runtime.record_queue_changed(
654                lash_core::SessionQueueEventKind::Enqueued,
655                queued_batches
656                    .iter()
657                    .map(|batch| batch.batch_id.clone())
658                    .collect(),
659            );
660        }
661    }
662
663    async fn compact_context(
664        &self,
665        instructions: Option<String>,
666        scoped_effect_controller: ScopedEffectController<'_>,
667    ) -> Result<bool> {
668        self.with_writer(async |runtime: &mut LashRuntime| {
669            runtime
670                .compact_context(instructions, scoped_effect_controller)
671                .await
672                .map_err(Into::into)
673        })
674        .await
675    }
676
677    async fn persist_current_state(&self) -> Result<RuntimeSessionState> {
678        self.with_writer(async |runtime: &mut LashRuntime| {
679            runtime.await_background_work().await?;
680            Ok(runtime.export_persisted_state())
681        })
682        .await
683    }
684
685    async fn list_process_handles(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
686        Ok(self.runtime.observe().list_process_handles().await)
687    }
688
689    async fn list_all_process_handles(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
690        Ok(self.runtime.observe().list_all_process_handles().await)
691    }
692
693    async fn start_process(
694        &self,
695        request: lash_core::ProcessStartRequest,
696        scoped_effect_controller: ScopedEffectController<'_>,
697    ) -> Result<lash_core::ProcessHandleSummary> {
698        let writer = self.runtime.writer();
699        let runtime = writer.lock().await;
700        let session_id = runtime.session_id().to_string();
701        let processes = runtime.process_service()?;
702        let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
703        let summary = processes
704            .start_from_request(&session_id, request, scope)
705            .await
706            .map_err(EmbedError::Plugin)?;
707        self.runtime.record_process_changed(
708            SessionProcessEventKind::Started,
709            vec![summary.process_id.clone()],
710        );
711        Ok(summary)
712    }
713
714    async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
715        self.runtime
716            .writer()
717            .lock()
718            .await
719            .session_state_service()
720            .map_err(Into::into)
721    }
722
723    async fn cancel_process(
724        &self,
725        process_id: &str,
726        scoped_effect_controller: ScopedEffectController<'_>,
727    ) -> Result<lash_core::ProcessCancelSummary> {
728        let writer = self.runtime.writer();
729        let runtime = writer.lock().await;
730        let session_id = runtime.session_id().to_string();
731        let processes = runtime.process_service()?;
732        let cancel_ability = runtime.process_cancel_ability();
733        let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
734        let summary = cancel_ability
735            .cancel_summary(
736                processes.as_ref(),
737                lash_core::ProcessCancelRequest::new(
738                    &session_id,
739                    process_id,
740                    scope,
741                    lash_core::ProcessCancelSource::HostApi,
742                )
743                .with_reason("requested by host API"),
744            )
745            .await
746            .map_err(EmbedError::Plugin)?;
747        self.runtime.record_process_changed(
748            SessionProcessEventKind::Cancelled,
749            vec![summary.process_id.clone()],
750        );
751        Ok(summary)
752    }
753
754    async fn cancel_visible_processes(
755        &self,
756        scoped_effect_controller: ScopedEffectController<'_>,
757    ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
758        let writer = self.runtime.writer();
759        let runtime = writer.lock().await;
760        let session_id = runtime.session_id().to_string();
761        let processes = runtime.process_service()?;
762        let cancel_ability = runtime.process_cancel_ability();
763        let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
764        let summaries = cancel_ability
765            .cancel_all_visible(
766                processes.as_ref(),
767                lash_core::ProcessCancelAllRequest::new(
768                    &session_id,
769                    scope,
770                    lash_core::ProcessCancelSource::HostApi,
771                )
772                .with_reason("requested by host API"),
773            )
774            .await
775            .map_err(EmbedError::Plugin)?;
776        self.runtime.record_process_changed(
777            SessionProcessEventKind::Cancelled,
778            summaries
779                .iter()
780                .map(|summary| summary.process_id.clone())
781                .collect(),
782        );
783        Ok(summaries)
784    }
785
786    async fn snapshot_execution_state(&self) -> Result<Option<Vec<u8>>> {
787        self.with_writer(async |runtime: &mut LashRuntime| {
788            runtime.snapshot_execution_state().await.map_err(Into::into)
789        })
790        .await
791    }
792
793    async fn restore_execution_state(&self, bytes: &[u8]) -> Result<()> {
794        self.with_writer(async |runtime: &mut LashRuntime| {
795            runtime
796                .restore_execution_state(bytes)
797                .await
798                .map_err(Into::into)
799        })
800        .await
801    }
802
803    async fn tool_state(&self) -> Result<ToolState> {
804        self.runtime.observe().tool_state.clone().ok_or_else(|| {
805            EmbedError::Session(SessionError::Protocol(
806                "runtime session not available".to_string(),
807            ))
808        })
809    }
810
811    async fn apply_tool_state(&self, state: ToolState) -> Result<u64> {
812        self.with_writer(async |runtime: &mut LashRuntime| {
813            runtime
814                .apply_tool_state(state)
815                .await
816                .map_err(EmbedError::from)
817        })
818        .await
819    }
820
821    async fn restore_tool_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
822        self.with_writer(async |runtime: &mut LashRuntime| {
823            runtime
824                .restore_tool_state(state)
825                .await
826                .map_err(EmbedError::from)
827        })
828        .await
829    }
830
831    async fn set_tool_availability(
832        &self,
833        tool_id: lash_core::ToolId,
834        availability: ToolAvailability,
835    ) -> Result<u64> {
836        self.set_tool_availability_many(&[(tool_id, availability)])
837            .await
838    }
839
840    async fn set_tool_availability_many(
841        &self,
842        updates: &[(lash_core::ToolId, ToolAvailability)],
843    ) -> Result<u64> {
844        let mut state = self.tool_state().await?;
845        for (tool_id, availability) in updates {
846            state
847                .set_availability(tool_id, Some(*availability))
848                .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
849        }
850        self.apply_tool_state(state).await
851    }
852
853    async fn clear_tool_availability_override(&self, tool_id: lash_core::ToolId) -> Result<u64> {
854        let mut state = self.tool_state().await?;
855        state
856            .set_availability(&tool_id, None)
857            .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
858        self.apply_tool_state(state).await
859    }
860
861    async fn active_tool_manifests(&self) -> Result<Vec<ToolManifest>> {
862        Ok(self.tool_state().await?.tool_manifests())
863    }
864
865    async fn add_tool_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
866        let tool_registry = self.tool_registry().await?;
867        let handle = tool_registry
868            .add_tool_provider(provider)
869            .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
870        self.refresh_tool_catalog().await?;
871        Ok(handle)
872    }
873
874    async fn remove_tool_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
875        let tool_registry = self.tool_registry().await?;
876        let generation = tool_registry
877            .remove_source(handle)
878            .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
879        self.refresh_tool_catalog().await?;
880        Ok(generation)
881    }
882
883    async fn create_child_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
884        let writer = self.runtime.writer();
885        let runtime = writer.lock().await;
886        let lifecycle = runtime.session_lifecycle_service()?;
887        lifecycle.create_session(request).await.map_err(Into::into)
888    }
889
890    async fn close_child_session(&self, session_id: &str) -> Result<()> {
891        let writer = self.runtime.writer();
892        let runtime = writer.lock().await;
893        let lifecycle = runtime.session_lifecycle_service()?;
894        lifecycle
895            .close_session(session_id)
896            .await
897            .map_err(Into::into)
898    }
899
900    async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
901        self.with_writer(async |runtime: &mut LashRuntime| {
902            runtime
903                .activate_managed_session(session_id)
904                .await
905                .map_err(Into::into)
906        })
907        .await
908    }
909
910    async fn inject_turn_input(&self, id: Option<String>, message: PluginMessage) -> Result<()> {
911        self.inject_turn_inputs(vec![lash_core::InjectedTurnInput { id, message }])
912            .await
913    }
914
915    async fn inject_turn_inputs(&self, messages: Vec<lash_core::InjectedTurnInput>) -> Result<()> {
916        for input in messages {
917            let source_key = input.id.map(|id| format!("injection:{id}"));
918            let turn_input = turn_input_from_plugin_message(input.message);
919            self.runtime
920                .enqueue_turn_input(
921                    turn_input,
922                    lash_core::DeliveryPolicy::EarliestSafeBoundary,
923                    lash_core::SlotPolicy::Join,
924                    source_key,
925                )
926                .await
927                .map(|_| ())
928                .map_err(EmbedError::Runtime)?;
929        }
930        Ok(())
931    }
932
933    async fn tool_registry(&self) -> Result<Arc<lash_core::ToolRegistry>> {
934        self.runtime
935            .writer()
936            .lock()
937            .await
938            .plugin_session()
939            .map(|session| session.tool_registry())
940            .ok_or_else(|| {
941                EmbedError::Session(SessionError::Protocol(
942                    "tool registry is unavailable in this runtime session".to_string(),
943                ))
944            })
945    }
946}
947
948fn turn_input_from_plugin_message(message: PluginMessage) -> TurnInput {
949    let mut input = TurnInput::empty();
950    if !message.content.is_empty() {
951        input.items.push(InputItem::Text {
952            text: message.content,
953        });
954    }
955    for (index, bytes) in message.images.into_iter().enumerate() {
956        let id = format!("injected-image-{index}");
957        input.items.push(InputItem::ImageRef { id: id.clone() });
958        input.image_blobs.insert(id, bytes);
959    }
960    input
961}
962
963#[derive(Clone)]
964pub struct SessionConfigAdmin {
965    control: SessionAdmin,
966}
967
968impl SessionConfigAdmin {
969    pub async fn update(&self, patch: SessionConfigPatch) -> Result<()> {
970        self.control.update_config(patch).await
971    }
972
973    pub async fn update_session_config(
974        &self,
975        provider: Option<ProviderHandle>,
976        model: Option<lash_core::ModelSpec>,
977        prompt: Option<PromptLayer>,
978    ) -> Result<()> {
979        self.control
980            .update_session_config(provider, model, prompt)
981            .await
982    }
983
984    pub async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
985        self.control.set_prompt_template(template).await
986    }
987
988    pub async fn clear_prompt_template(&self) -> Result<()> {
989        self.control.clear_prompt_template().await
990    }
991
992    pub async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
993        self.control.add_prompt_contribution(contribution).await
994    }
995
996    pub async fn replace_prompt_slot(
997        &self,
998        slot: PromptSlot,
999        contributions: impl IntoIterator<Item = PromptContribution>,
1000    ) -> Result<()> {
1001        self.control.replace_prompt_slot(slot, contributions).await
1002    }
1003
1004    pub async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
1005        self.control.clear_prompt_slot(slot).await
1006    }
1007}
1008
1009#[derive(Clone)]
1010pub struct ToolAdmin {
1011    control: SessionAdmin,
1012}
1013
1014impl ToolAdmin {
1015    pub(crate) fn new(control: SessionAdmin) -> Self {
1016        Self { control }
1017    }
1018}
1019
1020impl ToolAdmin {
1021    pub async fn state(&self) -> Result<ToolState> {
1022        self.control.tool_state().await
1023    }
1024
1025    pub fn advanced(&self) -> AdvancedToolAdmin {
1026        AdvancedToolAdmin {
1027            control: self.control.clone(),
1028        }
1029    }
1030
1031    pub async fn set_availability(
1032        &self,
1033        tool_id: impl Into<lash_core::ToolId>,
1034        availability: ToolAvailability,
1035    ) -> Result<u64> {
1036        self.control
1037            .set_tool_availability(tool_id.into(), availability)
1038            .await
1039    }
1040
1041    pub async fn set_availability_many(
1042        &self,
1043        updates: &[(lash_core::ToolId, ToolAvailability)],
1044    ) -> Result<u64> {
1045        self.control.set_tool_availability_many(updates).await
1046    }
1047
1048    pub async fn clear_availability_override(
1049        &self,
1050        tool_id: impl Into<lash_core::ToolId>,
1051    ) -> Result<u64> {
1052        self.control
1053            .clear_tool_availability_override(tool_id.into())
1054            .await
1055    }
1056
1057    pub async fn active_manifests(&self) -> Result<Vec<ToolManifest>> {
1058        self.control.active_tool_manifests().await
1059    }
1060
1061    pub async fn add_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
1062        self.control.add_tool_provider(provider).await
1063    }
1064
1065    pub async fn remove_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
1066        self.control.remove_tool_source(handle).await
1067    }
1068}
1069
1070#[derive(Clone)]
1071pub struct AdvancedToolAdmin {
1072    control: SessionAdmin,
1073}
1074
1075impl AdvancedToolAdmin {
1076    /// Replace the entire tool-state snapshot.
1077    ///
1078    /// This is a generation-checked escape hatch for hosts that intentionally
1079    /// edit the full snapshot. Prefer `ToolAdmin` availability methods for
1080    /// ordinary tool policy changes.
1081    pub async fn apply_state(&self, state: ToolState) -> Result<u64> {
1082        self.control.apply_tool_state(state).await
1083    }
1084
1085    /// Restore a persisted tool-state snapshot, adopting its generation.
1086    ///
1087    /// Use this when re-applying a snapshot read from durable storage (session
1088    /// resume), not an edited delta: it reconstructs the exact persisted surface
1089    /// idempotently rather than requiring the snapshot to match the current
1090    /// generation. A cold resume of a session whose surface reached generation
1091    /// ≥ 2 needs this — [`apply_state`](Self::apply_state) would reject it.
1092    ///
1093    /// Persisted tools whose source is not currently registered (e.g. a
1094    /// detached MCP server) do not fail the restore: they are kept as orphans,
1095    /// forced `Off`, listed in the returned [`ToolRestoreReport`], and rebind
1096    /// automatically when a source re-advertises the same tool.
1097    pub async fn restore_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
1098        self.control.restore_tool_state(state).await
1099    }
1100}
1101
1102#[derive(Clone)]
1103pub struct SessionCommandAdmin {
1104    control: SessionAdmin,
1105}
1106
1107impl SessionCommandAdmin {
1108    /// Enqueue an unconditional tool-catalog refresh. The command drains
1109    /// asynchronously and recomputes the surface from live sources, so it
1110    /// takes no generation guard — any generation observed at enqueue time
1111    /// could legitimately have advanced by drain time.
1112    pub async fn refresh_tool_catalog(
1113        &self,
1114        reason: impl Into<String>,
1115        idempotency_key: impl Into<String>,
1116    ) -> Result<lash_core::SessionCommandReceipt> {
1117        self.control
1118            .submit_session_command(
1119                lash_core::SessionCommand::RefreshToolCatalog {
1120                    reason: reason.into(),
1121                },
1122                idempotency_key,
1123            )
1124            .await
1125    }
1126
1127    pub async fn reset(
1128        &self,
1129        reason: impl Into<String>,
1130        idempotency_key: impl Into<String>,
1131    ) -> Result<lash_core::SessionCommandReceipt> {
1132        self.control
1133            .submit_session_command(
1134                lash_core::SessionCommand::ResetSession {
1135                    reason: reason.into(),
1136                },
1137                idempotency_key,
1138            )
1139            .await
1140    }
1141}
1142
1143/// Session-scoped read controls for Lashlang trigger registrations.
1144#[derive(Clone)]
1145pub struct SessionTriggerAdmin {
1146    control: SessionAdmin,
1147}
1148
1149impl SessionTriggerAdmin {
1150    /// Return every trigger registration in the session.
1151    ///
1152    /// This is an admin/introspection view. Source owners should prefer
1153    /// [`Self::by_source_type`] so they only inspect registrations for the
1154    /// concrete source type they own.
1155    pub async fn list_all(&self) -> Result<Vec<lash_core::TriggerRegistration>> {
1156        self.control.list_trigger_registrations().await
1157    }
1158
1159    /// Return registrations whose source value has the given host descriptor type.
1160    ///
1161    /// This is the source-owner API: a timer, UI, webhook, or other host-owned
1162    /// source uses it to inspect registrations for keys it may schedule and emit.
1163    pub async fn by_source_type(
1164        &self,
1165        source_type: impl Into<lash_core::TriggerEventType>,
1166    ) -> Result<Vec<lash_core::TriggerRegistration>> {
1167        self.control
1168            .trigger_registrations_by_source_type(source_type)
1169            .await
1170    }
1171}
1172
1173#[derive(Clone)]
1174pub struct SessionProcessAdmin {
1175    control: SessionAdmin,
1176}
1177
1178impl SessionProcessAdmin {
1179    pub(crate) fn new(control: SessionAdmin) -> Self {
1180        Self { control }
1181    }
1182
1183    pub async fn start(
1184        &self,
1185        request: lash_core::ProcessStartRequest,
1186        scoped_effect_controller: ScopedEffectController<'_>,
1187    ) -> Result<lash_core::ProcessHandleSummary> {
1188        self.control
1189            .start_process(request, scoped_effect_controller)
1190            .await
1191    }
1192
1193    pub async fn list(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
1194        self.control.list_process_handles().await
1195    }
1196
1197    pub async fn list_all(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
1198        self.control.list_all_process_handles().await
1199    }
1200
1201    pub async fn await_all(&self) -> Result<()> {
1202        self.control.await_background_work().await
1203    }
1204
1205    pub async fn cancel(
1206        &self,
1207        process_id: &str,
1208        scoped_effect_controller: ScopedEffectController<'_>,
1209    ) -> Result<lash_core::ProcessCancelSummary> {
1210        self.control
1211            .cancel_process(process_id, scoped_effect_controller)
1212            .await
1213    }
1214
1215    pub async fn cancel_all(
1216        &self,
1217        scoped_effect_controller: ScopedEffectController<'_>,
1218    ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
1219        self.control
1220            .cancel_visible_processes(scoped_effect_controller)
1221            .await
1222    }
1223}
1224
1225#[derive(Clone)]
1226pub struct SessionStateAdmin {
1227    control: SessionAdmin,
1228}
1229
1230impl SessionStateAdmin {
1231    pub async fn export(&self) -> lash_core::SessionSnapshot {
1232        self.control.export_state().await
1233    }
1234
1235    pub async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
1236        self.control.append_messages(messages).await
1237    }
1238
1239    pub async fn append_plugin_body(
1240        &self,
1241        plugin_type: impl Into<String>,
1242        body: serde_json::Value,
1243    ) -> Result<()> {
1244        self.control.append_plugin_body(plugin_type, body).await
1245    }
1246
1247    pub async fn set_persisted(&self, state: RuntimeSessionState) -> Result<()> {
1248        self.control.set_persisted_state(state).await
1249    }
1250
1251    pub async fn branch_to_node(
1252        &self,
1253        target_leaf: Option<String>,
1254    ) -> Result<lash_core::SessionSnapshot> {
1255        self.control.branch_to_node(target_leaf).await
1256    }
1257
1258    pub async fn persist_current(&self) -> Result<RuntimeSessionState> {
1259        self.control.persist_current_state().await
1260    }
1261
1262    pub async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
1263        self.control.session_state_service().await
1264    }
1265
1266    pub async fn snapshot_execution(&self) -> Result<Option<Vec<u8>>> {
1267        self.control.snapshot_execution_state().await
1268    }
1269
1270    pub async fn restore_execution(&self, bytes: &[u8]) -> Result<()> {
1271        self.control.restore_execution_state(bytes).await
1272    }
1273
1274    pub async fn compact_context(
1275        &self,
1276        instructions: Option<String>,
1277        scoped_effect_controller: ScopedEffectController<'_>,
1278    ) -> Result<bool> {
1279        self.control
1280            .compact_context(instructions, scoped_effect_controller)
1281            .await
1282    }
1283}
1284
1285#[derive(Clone)]
1286pub struct PluginOperations {
1287    pub(crate) control: SessionAdmin,
1288}
1289
1290impl PluginOperations {
1291    pub async fn query<Op: lash_core::PluginQuery>(&self, args: Op::Args) -> Result<Op::Output> {
1292        let (_plugin_id, output) = self
1293            .control
1294            .query_plugin_raw(Op::NAME, encode_plugin_args::<Op>(args)?)
1295            .await?;
1296        decode_plugin_output::<Op>(output)
1297    }
1298
1299    pub async fn query_raw(
1300        &self,
1301        name: &str,
1302        args: serde_json::Value,
1303    ) -> Result<(String, serde_json::Value)> {
1304        self.control.query_plugin_raw(name, args).await
1305    }
1306
1307    pub async fn run_command<Op: lash_core::PluginCommand>(
1308        &self,
1309        args: Op::Args,
1310    ) -> Result<lash_core::PluginCommandReceipt<Op::Output>> {
1311        let receipt = self
1312            .control
1313            .run_plugin_command_raw(Op::NAME, encode_plugin_args::<Op>(args)?)
1314            .await?;
1315        Ok(lash_core::PluginCommandReceipt {
1316            output: decode_plugin_output::<Op>(receipt.output)?,
1317            events: receipt.events,
1318            queued_batches: receipt.queued_batches,
1319        })
1320    }
1321
1322    pub async fn run_command_raw(
1323        &self,
1324        name: &str,
1325        args: serde_json::Value,
1326    ) -> Result<lash_core::PluginCommandReceipt<serde_json::Value>> {
1327        self.control.run_plugin_command_raw(name, args).await
1328    }
1329
1330    pub async fn run_task<Op: lash_core::PluginTask>(
1331        &self,
1332        args: Op::Args,
1333    ) -> Result<lash_core::PluginTaskReceipt<Op::Output>> {
1334        self.run_task_with_cancel::<Op>(args, CancellationToken::new())
1335            .await
1336    }
1337
1338    pub async fn run_task_with_cancel<Op: lash_core::PluginTask>(
1339        &self,
1340        args: Op::Args,
1341        cancellation_token: CancellationToken,
1342    ) -> Result<lash_core::PluginTaskReceipt<Op::Output>> {
1343        let receipt = self
1344            .control
1345            .run_plugin_task_raw_with_cancel(
1346                Op::NAME,
1347                encode_plugin_args::<Op>(args)?,
1348                cancellation_token,
1349            )
1350            .await?;
1351        Ok(lash_core::PluginTaskReceipt {
1352            output: decode_plugin_output::<Op>(receipt.output)?,
1353            events: receipt.events,
1354            queued_batches: receipt.queued_batches,
1355        })
1356    }
1357
1358    pub async fn run_task_raw(
1359        &self,
1360        name: &str,
1361        args: serde_json::Value,
1362    ) -> Result<lash_core::PluginTaskReceipt<serde_json::Value>> {
1363        self.run_task_raw_with_cancel(name, args, CancellationToken::new())
1364            .await
1365    }
1366
1367    pub async fn run_task_raw_with_cancel(
1368        &self,
1369        name: &str,
1370        args: serde_json::Value,
1371        cancellation_token: CancellationToken,
1372    ) -> Result<lash_core::PluginTaskReceipt<serde_json::Value>> {
1373        self.control
1374            .run_plugin_task_raw_with_cancel(name, args, cancellation_token)
1375            .await
1376    }
1377}
1378
1379fn encode_plugin_args<Op: lash_core::PluginOperation>(args: Op::Args) -> Result<serde_json::Value> {
1380    serde_json::to_value(args).map_err(|err| {
1381        EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
1382            "invalid {} args: {err}",
1383            Op::NAME
1384        )))
1385    })
1386}
1387
1388fn decode_plugin_output<Op: lash_core::PluginOperation>(
1389    output: serde_json::Value,
1390) -> Result<Op::Output> {
1391    serde_json::from_value(output).map_err(|err| {
1392        EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
1393            "invalid {} output: {err}",
1394            Op::NAME
1395        )))
1396    })
1397}
1398
1399#[derive(Clone)]
1400pub struct ChildSessionAdmin {
1401    control: SessionAdmin,
1402}
1403
1404impl ChildSessionAdmin {
1405    pub async fn create_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
1406        self.control.create_child_session(request).await
1407    }
1408
1409    pub async fn close_session(&self, session_id: &str) -> Result<()> {
1410        self.control.close_child_session(session_id).await
1411    }
1412
1413    pub async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
1414        self.control.activate_managed_session(session_id).await
1415    }
1416}
1417
1418#[derive(Clone)]
1419pub struct InjectionAdmin {
1420    control: SessionAdmin,
1421}
1422
1423impl InjectionAdmin {
1424    pub async fn inject_turn_input(
1425        &self,
1426        id: Option<String>,
1427        message: PluginMessage,
1428    ) -> Result<()> {
1429        self.control.inject_turn_input(id, message).await
1430    }
1431
1432    pub async fn inject_turn_inputs(
1433        &self,
1434        messages: Vec<lash_core::InjectedTurnInput>,
1435    ) -> Result<()> {
1436        self.control.inject_turn_inputs(messages).await
1437    }
1438}
1439
1440#[derive(Clone)]
1441pub struct ProtocolAdmin {
1442    control: SessionAdmin,
1443}
1444
1445impl ProtocolAdmin {
1446    pub async fn apply_session_extension(
1447        &self,
1448        extension: lash_core::ProtocolSessionExtensionHandle,
1449    ) -> Result<()> {
1450        self.control
1451            .apply_protocol_session_extension(extension)
1452            .await
1453    }
1454}