Skip to main content

lash/
admin.rs

1pub use crate::session::SessionConfigPatch;
2use crate::support::*;
3pub use lash_core::{AcceptedInjectedTurnInput, PluginCommand, PluginQuery, PluginTask};
4
5#[derive(Clone)]
6pub struct Completions {
7    pub(crate) core: LashCore,
8}
9
10impl Completions {
11    pub async fn resolve(
12        &self,
13        key: lash_core::AwaitEventKey,
14        resolution: lash_core::Resolution,
15    ) -> Result<lash_core::ResolveOutcome> {
16        self.core
17            .env
18            .core
19            .control
20            .effect_host
21            .resolve_await_event(&key, resolution)
22            .await
23            .map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))
24    }
25}
26
27#[derive(Clone)]
28pub struct CoreTriggerAdmin {
29    pub(crate) core: LashCore,
30}
31
32impl CoreTriggerAdmin {
33    pub async fn emit(
34        &self,
35        request: lash_core::TriggerOccurrenceRequest,
36        scoped_effect_controller: ScopedEffectController<'_>,
37    ) -> Result<lash_core::TriggerEmitReport> {
38        let store = self.core.env.trigger_store.as_ref().ok_or_else(|| {
39            EmbedError::Plugin(lash_core::PluginError::Session(
40                "trigger store is unavailable in this runtime".to_string(),
41            ))
42        })?;
43        let drivers = self.core.work_driver.drivers().await;
44        let router = lash_core::TriggerRouter::new(
45            Arc::clone(store),
46            self.core.env.process_registry.clone(),
47            drivers.process,
48        );
49        router
50            .emit(request, scoped_effect_controller.controller())
51            .await
52            .map_err(Into::into)
53    }
54
55    pub async fn subscriptions(
56        &self,
57        filter: lash_core::TriggerSubscriptionFilter,
58    ) -> Result<Vec<lash_core::TriggerRegistration>> {
59        let store = self.core.env.trigger_store.as_ref().ok_or_else(|| {
60            EmbedError::Plugin(lash_core::PluginError::Session(
61                "trigger store is unavailable in this runtime".to_string(),
62            ))
63        })?;
64        let records = store.list_subscriptions(filter).await?;
65        Ok(records
66            .iter()
67            .map(lash_core::TriggerRegistration::from)
68            .collect())
69    }
70}
71
72#[derive(Clone)]
73pub struct Processes {
74    pub(crate) core: LashCore,
75}
76
77impl Processes {
78    fn registry(&self) -> Result<Arc<dyn lash_core::ProcessRegistry>> {
79        self.core
80            .env
81            .process_registry
82            .as_ref()
83            .cloned()
84            .ok_or_else(|| {
85                EmbedError::Plugin(lash_core::PluginError::Session(
86                    "process registry is unavailable in this runtime".to_string(),
87                ))
88            })
89    }
90
91    fn make_observer(&self) -> Result<lash_core::ProcessWorkObserver> {
92        Ok(lash_core::ProcessWorkObserver::new(self.registry()?))
93    }
94
95    fn process_invocation(command: &lash_core::ProcessCommand) -> lash_core::RuntimeInvocation {
96        let effect_id = command.effect_id();
97        lash_core::RuntimeInvocation::effect(
98            lash_core::runtime::RuntimeScope::new("runtime"),
99            effect_id.clone(),
100            lash_core::RuntimeEffectKind::Process,
101            effect_id,
102        )
103    }
104
105    async fn run_command(
106        &self,
107        command: lash_core::ProcessCommand,
108        scoped_effect_controller: ScopedEffectController<'_>,
109    ) -> Result<lash_core::ProcessEffectOutcome> {
110        let registry = self.registry()?;
111        let invocation = Self::process_invocation(&command);
112        let outcome = scoped_effect_controller
113            .controller()
114            .execute_effect(
115                lash_core::RuntimeEffectEnvelope::new(
116                    invocation,
117                    lash_core::RuntimeEffectCommand::process(command),
118                ),
119                lash_core::RuntimeEffectLocalExecutor::processes(registry),
120            )
121            .await
122            .map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))?;
123        match outcome {
124            lash_core::RuntimeEffectOutcome::Process { result } => Ok(result),
125            _ => Err(EmbedError::Plugin(lash_core::PluginError::Session(
126                "process effect returned non-process outcome".to_string(),
127            ))),
128        }
129    }
130
131    pub async fn start(
132        &self,
133        request: lash_core::ProcessStartRequest,
134        scoped_effect_controller: ScopedEffectController<'_>,
135    ) -> Result<lash_core::ProcessRecord> {
136        let env_ref = match request.env_spec.as_ref() {
137            Some(env_spec) => Some(
138                lash_core::runtime::persist_process_execution_env(
139                    self.core.env.core.durability.process_env_store.as_ref(),
140                    env_spec,
141                )
142                .await?,
143            ),
144            None => None,
145        };
146        let grant = request.grant.clone();
147        let registration = request.into_registration(env_ref);
148        let command = lash_core::ProcessCommand::Start {
149            registration,
150            grant,
151            execution_context: Box::new(lash_core::ProcessExecutionContext::default()),
152        };
153        let outcome = self
154            .run_command(command, scoped_effect_controller.clone())
155            .await?;
156        let lash_core::ProcessEffectOutcome::Start { record } = outcome else {
157            return Err(EmbedError::Plugin(lash_core::PluginError::Session(
158                "process start returned the wrong outcome".to_string(),
159            )));
160        };
161        if let Some(driver) = self.core.work_driver.drivers().await.process {
162            driver.claim_and_run_pending("admin_process_start").await?;
163        }
164        Ok(record)
165    }
166
167    pub async fn list(
168        &self,
169        filter: &lash_core::ProcessListFilter,
170    ) -> Result<Vec<lash_core::ObservedProcess>> {
171        self.make_observer()?.list(filter).await.map_err(Into::into)
172    }
173
174    pub async fn get(&self, process_id: &str) -> Result<Option<lash_core::ObservedProcess>> {
175        Ok(self.make_observer()?.process(process_id).await)
176    }
177
178    pub async fn events(
179        &self,
180        process_id: &str,
181        after_sequence: u64,
182    ) -> Result<Vec<lash_core::ObservedProcessEvent>> {
183        self.make_observer()?
184            .events_after(process_id, after_sequence)
185            .await
186            .map_err(Into::into)
187    }
188
189    pub async fn await_output(&self, process_id: &str) -> Result<lash_core::ProcessAwaitOutput> {
190        self.registry()?
191            .await_process(process_id)
192            .await
193            .map_err(Into::into)
194    }
195
196    pub async fn cancel(
197        &self,
198        process_id: &str,
199        scoped_effect_controller: ScopedEffectController<'_>,
200    ) -> Result<lash_core::ProcessCancelSummary> {
201        let command = lash_core::ProcessCommand::Cancel {
202            process_id: process_id.to_string(),
203            reason: Some("requested by host".to_string()),
204        };
205        let outcome = self
206            .run_command(command, scoped_effect_controller.clone())
207            .await?;
208        let lash_core::ProcessEffectOutcome::Cancel { record } = outcome else {
209            return Err(EmbedError::Plugin(lash_core::PluginError::Session(
210                "process cancel returned the wrong outcome".to_string(),
211            )));
212        };
213        Ok(lash_core::ProcessCancelSummary::from_record(record))
214    }
215
216    pub async fn signal(
217        &self,
218        process_id: &str,
219        signal_name: impl Into<String>,
220        signal_id: impl Into<String>,
221        request: lash_core::ProcessEventAppendRequest,
222        scoped_effect_controller: ScopedEffectController<'_>,
223    ) -> Result<lash_core::ProcessEvent> {
224        let signal_name = signal_name.into();
225        let event_type = request.event_type.clone();
226        let payload = request.payload.clone();
227        let command = lash_core::ProcessCommand::Signal {
228            process_id: process_id.to_string(),
229            signal_name: signal_name.clone(),
230            signal_id: signal_id.into(),
231            request,
232        };
233        let outcome = self
234            .run_command(command, scoped_effect_controller.clone())
235            .await?;
236        let lash_core::ProcessEffectOutcome::Signal { event } = outcome else {
237            return Err(EmbedError::Plugin(lash_core::PluginError::Session(
238                "process signal returned the wrong outcome".to_string(),
239            )));
240        };
241        let registry = self.registry()?;
242        let waiting_ordinal =
243            registry
244                .get_process(process_id)
245                .await
246                .and_then(|record| match record.wait {
247                    Some(lash_core::WaitState {
248                        kind:
249                            lash_core::WaitKind::Signal {
250                                name,
251                                event_type: wait_event_type,
252                                ordinal,
253                                ..
254                            },
255                        ..
256                    }) if name == signal_name && wait_event_type == event_type => Some(ordinal),
257                    _ => None,
258                });
259        let ordinal = match waiting_ordinal {
260            Some(ordinal) => ordinal,
261            None => {
262                registry
263                    .count_events_through(process_id, &event_type, event.sequence)
264                    .await?
265            }
266        };
267        if ordinal > 0 {
268            let key = scoped_effect_controller
269                .controller()
270                .await_event_key(
271                    &lash_core::ExecutionScope::process(process_id),
272                    lash_core::AwaitEventWaitIdentity::process_signal(
273                        process_id,
274                        &signal_name,
275                        ordinal,
276                    ),
277                )
278                .await
279                .map_err(|err| {
280                    EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
281                })?;
282            let _ = scoped_effect_controller
283                .controller()
284                .resolve_await_event(&key, lash_core::Resolution::Ok(payload))
285                .await
286                .map_err(|err| {
287                    EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
288                })?;
289        }
290        Ok(event)
291    }
292
293    pub async fn session_snapshot(
294        &self,
295        session_id: impl Into<String>,
296    ) -> Result<lash_core::ProcessWorkSnapshot> {
297        self.make_observer()?
298            .snapshot_for_session(session_id)
299            .await
300            .map_err(Into::into)
301    }
302
303    pub fn observer(&self) -> Result<lash_core::ProcessWorkObserver> {
304        self.make_observer()
305    }
306}
307
308#[derive(Clone)]
309pub struct SessionAdmin {
310    pub(crate) runtime: RuntimeHandle,
311}
312
313impl SessionAdmin {
314    pub fn config(&self) -> SessionConfigAdmin {
315        SessionConfigAdmin {
316            control: self.clone(),
317        }
318    }
319
320    pub fn tools(&self) -> ToolAdmin {
321        ToolAdmin {
322            control: self.clone(),
323        }
324    }
325
326    pub fn commands(&self) -> SessionCommandAdmin {
327        SessionCommandAdmin {
328            control: self.clone(),
329        }
330    }
331
332    pub fn triggers(&self) -> SessionTriggerAdmin {
333        SessionTriggerAdmin {
334            control: self.clone(),
335        }
336    }
337
338    pub fn state(&self) -> SessionStateAdmin {
339        SessionStateAdmin {
340            control: self.clone(),
341        }
342    }
343
344    pub fn children(&self) -> ChildSessionAdmin {
345        ChildSessionAdmin {
346            control: self.clone(),
347        }
348    }
349
350    pub fn injection(&self) -> InjectionAdmin {
351        InjectionAdmin {
352            control: self.clone(),
353        }
354    }
355
356    pub fn protocol(&self) -> ProtocolAdmin {
357        ProtocolAdmin {
358            control: self.clone(),
359        }
360    }
361
362    pub fn processes(&self) -> SessionProcessAdmin {
363        SessionProcessAdmin {
364            control: self.clone(),
365        }
366    }
367
368    /// Run `f` against the locked runtime writer, then publish the resulting
369    /// observation. The body is the canonical `lock → call → publish_from`
370    /// stamp shared by nearly every mutating control method; publish happens
371    /// unconditionally once the closure returns.
372    async fn with_writer<F, T>(&self, f: F) -> T
373    where
374        F: AsyncFnOnce(&mut LashRuntime) -> T,
375    {
376        let writer = self.runtime.writer();
377        let mut runtime = writer.lock().await;
378        let value = f(&mut runtime).await;
379        self.runtime.publish_from(&runtime);
380        value
381    }
382
383    async fn update_config(&self, patch: SessionConfigPatch) -> Result<()> {
384        self.update_session_config(patch.provider, patch.model, patch.prompt)
385            .await?;
386        Ok(())
387    }
388
389    async fn update_session_config(
390        &self,
391        provider: Option<ProviderHandle>,
392        model: Option<lash_core::ModelSpec>,
393        prompt: Option<PromptLayer>,
394    ) -> Result<()> {
395        self.with_writer(async |runtime: &mut LashRuntime| {
396            runtime.update_session_config(provider, model, prompt).await;
397        })
398        .await;
399        Ok(())
400    }
401
402    async fn export_state(&self) -> lash_core::SessionSnapshot {
403        self.runtime.observe().read_view.to_snapshot()
404    }
405
406    async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
407        self.with_writer(async |runtime: &mut LashRuntime| {
408            runtime
409                .append_session_nodes(lash_core::AppendSessionNodesRequest {
410                    nodes: messages
411                        .into_iter()
412                        .map(lash_core::SessionAppendNode::message)
413                        .collect(),
414                    requires_ancestor_node_id: None,
415                })
416                .await
417                .map(|_| ())
418                .map_err(Into::into)
419        })
420        .await
421    }
422
423    async fn append_plugin_body(
424        &self,
425        plugin_type: impl Into<String>,
426        body: serde_json::Value,
427    ) -> Result<()> {
428        self.with_writer(async |runtime: &mut LashRuntime| {
429            runtime
430                .append_session_nodes(lash_core::AppendSessionNodesRequest {
431                    nodes: vec![lash_core::SessionAppendNode::plugin(plugin_type, body)],
432                    requires_ancestor_node_id: None,
433                })
434                .await
435                .map(|_| ())
436                .map_err(Into::into)
437        })
438        .await
439    }
440
441    async fn set_persisted_state(&self, state: RuntimeSessionState) -> Result<()> {
442        self.with_writer(async |runtime: &mut LashRuntime| {
443            runtime.set_persisted_state(state).map_err(Into::into)
444        })
445        .await
446    }
447
448    async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
449        self.with_writer(async |runtime: &mut LashRuntime| {
450            runtime.set_prompt_template(template).await;
451        })
452        .await;
453        Ok(())
454    }
455
456    async fn clear_prompt_template(&self) -> Result<()> {
457        self.with_writer(async |runtime: &mut LashRuntime| {
458            runtime.clear_prompt_template().await;
459        })
460        .await;
461        Ok(())
462    }
463
464    async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
465        self.with_writer(async |runtime: &mut LashRuntime| {
466            runtime.add_prompt_contribution(contribution).await;
467        })
468        .await;
469        Ok(())
470    }
471
472    async fn replace_prompt_slot(
473        &self,
474        slot: PromptSlot,
475        contributions: impl IntoIterator<Item = PromptContribution>,
476    ) -> Result<()> {
477        self.with_writer(async |runtime: &mut LashRuntime| {
478            runtime.replace_prompt_slot(slot, contributions).await;
479        })
480        .await;
481        Ok(())
482    }
483
484    async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
485        self.with_writer(async |runtime: &mut LashRuntime| {
486            runtime.clear_prompt_slot(slot).await;
487        })
488        .await;
489        Ok(())
490    }
491
492    async fn apply_protocol_session_extension(
493        &self,
494        extension: lash_core::ProtocolSessionExtensionHandle,
495    ) -> Result<()> {
496        self.with_writer(async |runtime: &mut LashRuntime| {
497            runtime
498                .apply_protocol_session_extension(extension)
499                .await
500                .map_err(Into::into)
501        })
502        .await
503    }
504
505    async fn branch_to_node(
506        &self,
507        target_leaf: Option<String>,
508    ) -> Result<lash_core::SessionSnapshot> {
509        self.with_writer(async |runtime: &mut LashRuntime| {
510            runtime
511                .branch_to_node(target_leaf)
512                .await
513                .map_err(Into::into)
514        })
515        .await
516    }
517
518    async fn await_background_work(&self) -> Result<()> {
519        self.with_writer(async |runtime: &mut LashRuntime| {
520            runtime.await_background_work().await.map_err(Into::into)
521        })
522        .await
523    }
524
525    async fn refresh_tool_catalog(&self) -> Result<()> {
526        self.with_writer(async |runtime: &mut LashRuntime| {
527            runtime
528                .refresh_session_tool_catalog()
529                .await
530                .map_err(Into::into)
531        })
532        .await
533    }
534
535    async fn submit_session_command(
536        &self,
537        command: lash_core::SessionCommand,
538        idempotency_key: impl Into<String>,
539    ) -> Result<lash_core::SessionCommandReceipt> {
540        let idempotency_key = idempotency_key.into();
541        self.with_writer(async |runtime: &mut LashRuntime| {
542            runtime
543                .submit_session_command(command, idempotency_key)
544                .await
545                .map_err(Into::into)
546        })
547        .await
548    }
549
550    async fn list_trigger_registrations(&self) -> Result<Vec<lash_core::TriggerRegistration>> {
551        self.with_writer(async |runtime: &mut LashRuntime| {
552            runtime
553                .list_trigger_registrations()
554                .await
555                .map_err(Into::into)
556        })
557        .await
558    }
559
560    async fn trigger_registrations_by_source_type(
561        &self,
562        source_type: impl Into<lash_core::TriggerEventType>,
563    ) -> Result<Vec<lash_core::TriggerRegistration>> {
564        self.with_writer(async |runtime: &mut LashRuntime| {
565            runtime
566                .trigger_registrations_by_source_type(source_type)
567                .await
568                .map_err(Into::into)
569        })
570        .await
571    }
572
573    async fn query_plugin_raw(
574        &self,
575        name: &str,
576        args: serde_json::Value,
577    ) -> Result<(String, serde_json::Value)> {
578        let observation = self.runtime.observe();
579        let session_id = observation.session_id().to_string();
580        observation
581            .query_plugin(name, args, Some(session_id))
582            .await
583            .map_err(Into::into)
584    }
585
586    async fn run_plugin_command_raw(
587        &self,
588        name: &str,
589        args: serde_json::Value,
590    ) -> Result<lash_core::PluginCommandReceipt<serde_json::Value>> {
591        let session_id = self.runtime.observe().session_id().to_string();
592        let writer = self.runtime.writer();
593        let mut runtime = writer.lock().await;
594        let receipt = runtime
595            .run_plugin_command(name, args, Some(session_id))
596            .await?;
597        self.record_plugin_operation_observations(&receipt.events, &receipt.queued_batches);
598        self.runtime.publish_from(&runtime);
599        Ok(receipt)
600    }
601
602    async fn run_plugin_task_raw_with_cancel(
603        &self,
604        name: &str,
605        args: serde_json::Value,
606        cancellation_token: CancellationToken,
607    ) -> Result<lash_core::PluginTaskReceipt<serde_json::Value>> {
608        let session_id = self.runtime.observe().session_id().to_string();
609        let writer = self.runtime.writer();
610        let mut runtime = writer.lock().await;
611        let scope_id = format!(
612            "{session_id}:plugin_task:{name}:{}",
613            lash_core::TurnActivityId::fresh().0
614        );
615        let scoped_effect_controller = runtime
616            .effect_host()
617            .scoped_static(lash_core::ExecutionScope::runtime_operation(scope_id))
618            .map_err(EmbedError::Runtime)?
619            .ok_or_else(|| {
620                EmbedError::Plugin(lash_core::PluginError::Session(
621                    "plugin task execution requires an effect host that can create a static runtime-operation scope".to_string(),
622                ))
623            })?;
624        let receipt = runtime
625            .run_plugin_task(
626                name,
627                args,
628                Some(session_id),
629                scoped_effect_controller,
630                cancellation_token,
631            )
632            .await?;
633        self.record_plugin_operation_observations(&receipt.events, &receipt.queued_batches);
634        self.runtime.publish_from(&runtime);
635        Ok(receipt)
636    }
637
638    fn record_plugin_operation_observations(
639        &self,
640        events: &[lash_core::PluginOwned<lash_core::PluginRuntimeEvent>],
641        queued_batches: &[lash_core::runtime::QueuedWorkBatch],
642    ) {
643        for owned in events {
644            self.runtime
645                .record_turn_activity(lash_core::TurnActivity::independent(
646                    lash_core::TurnEvent::PluginRuntime {
647                        plugin_id: owned.plugin_id.clone(),
648                        event: owned.value.clone(),
649                    },
650                ));
651        }
652        if !queued_batches.is_empty() {
653            self.runtime.record_queue_changed(
654                lash_core::SessionQueueEventKind::Enqueued,
655                queued_batches
656                    .iter()
657                    .map(|batch| batch.batch_id.clone())
658                    .collect(),
659            );
660        }
661    }
662
663    async fn compact_context(
664        &self,
665        instructions: Option<String>,
666        scoped_effect_controller: ScopedEffectController<'_>,
667    ) -> Result<bool> {
668        self.with_writer(async |runtime: &mut LashRuntime| {
669            runtime
670                .compact_context(instructions, scoped_effect_controller)
671                .await
672                .map_err(Into::into)
673        })
674        .await
675    }
676
677    async fn persist_current_state(&self) -> Result<RuntimeSessionState> {
678        self.with_writer(async |runtime: &mut LashRuntime| {
679            runtime.await_background_work().await?;
680            Ok(runtime.export_persisted_state())
681        })
682        .await
683    }
684
685    async fn list_process_handles(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
686        Ok(self.runtime.observe().list_process_handles().await)
687    }
688
689    async fn list_all_process_handles(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
690        Ok(self.runtime.observe().list_all_process_handles().await)
691    }
692
693    async fn start_process(
694        &self,
695        request: lash_core::ProcessStartRequest,
696        scoped_effect_controller: ScopedEffectController<'_>,
697    ) -> Result<lash_core::ProcessHandleSummary> {
698        let writer = self.runtime.writer();
699        let runtime = writer.lock().await;
700        let session_id = runtime.session_id().to_string();
701        let processes = runtime.process_service()?;
702        let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
703        let summary = processes
704            .start_from_request(&session_id, request, scope)
705            .await
706            .map_err(EmbedError::Plugin)?;
707        self.runtime.record_process_changed(
708            SessionProcessEventKind::Started,
709            vec![summary.process_id.clone()],
710        );
711        Ok(summary)
712    }
713
714    async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
715        self.runtime
716            .writer()
717            .lock()
718            .await
719            .session_state_service()
720            .map_err(Into::into)
721    }
722
723    async fn cancel_process(
724        &self,
725        process_id: &str,
726        scoped_effect_controller: ScopedEffectController<'_>,
727    ) -> Result<lash_core::ProcessCancelSummary> {
728        let writer = self.runtime.writer();
729        let runtime = writer.lock().await;
730        let session_id = runtime.session_id().to_string();
731        let processes = runtime.process_service()?;
732        let cancel_ability = runtime.process_cancel_ability();
733        let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
734        let summary = cancel_ability
735            .cancel_summary(
736                processes.as_ref(),
737                lash_core::ProcessCancelRequest::new(
738                    &session_id,
739                    process_id,
740                    scope,
741                    lash_core::ProcessCancelSource::HostApi,
742                )
743                .with_reason("requested by host API"),
744            )
745            .await
746            .map_err(EmbedError::Plugin)?;
747        self.runtime.record_process_changed(
748            SessionProcessEventKind::Cancelled,
749            vec![summary.process_id.clone()],
750        );
751        Ok(summary)
752    }
753
754    async fn cancel_visible_processes(
755        &self,
756        scoped_effect_controller: ScopedEffectController<'_>,
757    ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
758        let writer = self.runtime.writer();
759        let runtime = writer.lock().await;
760        let session_id = runtime.session_id().to_string();
761        let processes = runtime.process_service()?;
762        let cancel_ability = runtime.process_cancel_ability();
763        let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
764        let summaries = cancel_ability
765            .cancel_all_visible(
766                processes.as_ref(),
767                lash_core::ProcessCancelAllRequest::new(
768                    &session_id,
769                    scope,
770                    lash_core::ProcessCancelSource::HostApi,
771                )
772                .with_reason("requested by host API"),
773            )
774            .await
775            .map_err(EmbedError::Plugin)?;
776        self.runtime.record_process_changed(
777            SessionProcessEventKind::Cancelled,
778            summaries
779                .iter()
780                .map(|summary| summary.process_id.clone())
781                .collect(),
782        );
783        Ok(summaries)
784    }
785
786    async fn snapshot_execution_state(&self) -> Result<Option<Vec<u8>>> {
787        self.with_writer(async |runtime: &mut LashRuntime| {
788            runtime.snapshot_execution_state().await.map_err(Into::into)
789        })
790        .await
791    }
792
793    async fn restore_execution_state(&self, bytes: &[u8]) -> Result<()> {
794        self.with_writer(async |runtime: &mut LashRuntime| {
795            runtime
796                .restore_execution_state(bytes)
797                .await
798                .map_err(Into::into)
799        })
800        .await
801    }
802
803    async fn tool_state(&self) -> Result<ToolState> {
804        self.runtime.observe().tool_state.clone().ok_or_else(|| {
805            EmbedError::Session(SessionError::Protocol(
806                "runtime session not available".to_string(),
807            ))
808        })
809    }
810
811    async fn apply_tool_state(&self, state: ToolState) -> Result<u64> {
812        self.with_writer(async |runtime: &mut LashRuntime| {
813            runtime
814                .apply_tool_state(state)
815                .await
816                .map_err(EmbedError::from)
817        })
818        .await
819    }
820
821    async fn restore_tool_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
822        self.with_writer(async |runtime: &mut LashRuntime| {
823            runtime
824                .restore_tool_state(state)
825                .await
826                .map_err(EmbedError::from)
827        })
828        .await
829    }
830
831    async fn set_tool_membership(&self, tool_id: lash_core::ToolId, present: bool) -> Result<u64> {
832        self.set_tool_membership_many(&[(tool_id, present)]).await
833    }
834
835    async fn set_tool_membership_many(&self, updates: &[(lash_core::ToolId, bool)]) -> Result<u64> {
836        let mut state = self.tool_state().await?;
837        for (tool_id, present) in updates {
838            state
839                .set_membership(tool_id, *present)
840                .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
841        }
842        self.apply_tool_state(state).await
843    }
844
845    async fn active_tool_manifests(&self) -> Result<Vec<ToolManifest>> {
846        Ok(self.tool_state().await?.tool_manifests())
847    }
848
849    async fn add_tool_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
850        let tool_registry = self.tool_registry().await?;
851        let handle = tool_registry
852            .add_tool_provider(provider)
853            .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
854        self.refresh_tool_catalog().await?;
855        Ok(handle)
856    }
857
858    async fn remove_tool_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
859        let tool_registry = self.tool_registry().await?;
860        let generation = tool_registry
861            .remove_source(handle)
862            .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
863        self.refresh_tool_catalog().await?;
864        Ok(generation)
865    }
866
867    async fn create_child_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
868        let writer = self.runtime.writer();
869        let runtime = writer.lock().await;
870        let lifecycle = runtime.session_lifecycle_service()?;
871        lifecycle.create_session(request).await.map_err(Into::into)
872    }
873
874    async fn close_child_session(&self, session_id: &str) -> Result<()> {
875        let writer = self.runtime.writer();
876        let runtime = writer.lock().await;
877        let lifecycle = runtime.session_lifecycle_service()?;
878        lifecycle
879            .close_session(session_id)
880            .await
881            .map_err(Into::into)
882    }
883
884    async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
885        self.with_writer(async |runtime: &mut LashRuntime| {
886            runtime
887                .activate_managed_session(session_id)
888                .await
889                .map_err(Into::into)
890        })
891        .await
892    }
893
894    async fn inject_turn_input(&self, id: Option<String>, message: PluginMessage) -> Result<()> {
895        self.inject_turn_inputs(vec![lash_core::InjectedTurnInput { id, message }])
896            .await
897    }
898
899    async fn inject_turn_inputs(&self, messages: Vec<lash_core::InjectedTurnInput>) -> Result<()> {
900        for input in messages {
901            let source_key = input.id.map(|id| format!("injection:{id}"));
902            let turn_input = turn_input_from_plugin_message(input.message);
903            self.runtime
904                .enqueue_turn_input(
905                    turn_input,
906                    lash_core::DeliveryPolicy::EarliestSafeBoundary,
907                    lash_core::SlotPolicy::Join,
908                    source_key,
909                )
910                .await
911                .map(|_| ())
912                .map_err(EmbedError::Runtime)?;
913        }
914        Ok(())
915    }
916
917    async fn tool_registry(&self) -> Result<Arc<lash_core::ToolRegistry>> {
918        self.runtime
919            .writer()
920            .lock()
921            .await
922            .plugin_session()
923            .map(|session| session.tool_registry())
924            .ok_or_else(|| {
925                EmbedError::Session(SessionError::Protocol(
926                    "tool registry is unavailable in this runtime session".to_string(),
927                ))
928            })
929    }
930}
931
932fn turn_input_from_plugin_message(message: PluginMessage) -> TurnInput {
933    let mut input = TurnInput::empty();
934    if !message.content.is_empty() {
935        input.items.push(InputItem::Text {
936            text: message.content,
937        });
938    }
939    for (index, bytes) in message.images.into_iter().enumerate() {
940        let id = format!("injected-image-{index}");
941        input.items.push(InputItem::ImageRef { id: id.clone() });
942        input.image_blobs.insert(id, bytes);
943    }
944    input
945}
946
947#[derive(Clone)]
948pub struct SessionConfigAdmin {
949    control: SessionAdmin,
950}
951
952impl SessionConfigAdmin {
953    pub async fn update(&self, patch: SessionConfigPatch) -> Result<()> {
954        self.control.update_config(patch).await
955    }
956
957    pub async fn update_session_config(
958        &self,
959        provider: Option<ProviderHandle>,
960        model: Option<lash_core::ModelSpec>,
961        prompt: Option<PromptLayer>,
962    ) -> Result<()> {
963        self.control
964            .update_session_config(provider, model, prompt)
965            .await
966    }
967
968    pub async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
969        self.control.set_prompt_template(template).await
970    }
971
972    pub async fn clear_prompt_template(&self) -> Result<()> {
973        self.control.clear_prompt_template().await
974    }
975
976    pub async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
977        self.control.add_prompt_contribution(contribution).await
978    }
979
980    pub async fn replace_prompt_slot(
981        &self,
982        slot: PromptSlot,
983        contributions: impl IntoIterator<Item = PromptContribution>,
984    ) -> Result<()> {
985        self.control.replace_prompt_slot(slot, contributions).await
986    }
987
988    pub async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
989        self.control.clear_prompt_slot(slot).await
990    }
991}
992
993#[derive(Clone)]
994pub struct ToolAdmin {
995    control: SessionAdmin,
996}
997
998impl ToolAdmin {
999    pub(crate) fn new(control: SessionAdmin) -> Self {
1000        Self { control }
1001    }
1002}
1003
1004impl ToolAdmin {
1005    pub async fn state(&self) -> Result<ToolState> {
1006        self.control.tool_state().await
1007    }
1008
1009    pub fn advanced(&self) -> AdvancedToolAdmin {
1010        AdvancedToolAdmin {
1011            control: self.control.clone(),
1012        }
1013    }
1014
1015    /// Toggle Tool Catalog membership for a tool. `present` adds it as a
1016    /// member; `!present` removes it. Membership is the execution gate.
1017    pub async fn set_membership(
1018        &self,
1019        tool_id: impl Into<lash_core::ToolId>,
1020        present: bool,
1021    ) -> Result<u64> {
1022        self.control
1023            .set_tool_membership(tool_id.into(), present)
1024            .await
1025    }
1026
1027    pub async fn set_membership_many(&self, updates: &[(lash_core::ToolId, bool)]) -> Result<u64> {
1028        self.control.set_tool_membership_many(updates).await
1029    }
1030
1031    pub async fn active_manifests(&self) -> Result<Vec<ToolManifest>> {
1032        self.control.active_tool_manifests().await
1033    }
1034
1035    pub async fn add_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
1036        self.control.add_tool_provider(provider).await
1037    }
1038
1039    pub async fn remove_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
1040        self.control.remove_tool_source(handle).await
1041    }
1042}
1043
1044#[derive(Clone)]
1045pub struct AdvancedToolAdmin {
1046    control: SessionAdmin,
1047}
1048
1049impl AdvancedToolAdmin {
1050    /// Replace the entire tool-state snapshot.
1051    ///
1052    /// This is a generation-checked escape hatch for hosts that intentionally
1053    /// edit the full snapshot. Prefer `ToolAdmin` membership methods for
1054    /// ordinary tool policy changes.
1055    pub async fn apply_state(&self, state: ToolState) -> Result<u64> {
1056        self.control.apply_tool_state(state).await
1057    }
1058
1059    /// Restore a persisted tool-state snapshot, adopting its generation.
1060    ///
1061    /// Use this when re-applying a snapshot read from durable storage (session
1062    /// resume), not an edited delta: it reconstructs the exact persisted surface
1063    /// idempotently rather than requiring the snapshot to match the current
1064    /// generation. A cold resume of a session whose surface reached generation
1065    /// ≥ 2 needs this — [`apply_state`](Self::apply_state) would reject it.
1066    ///
1067    /// Persisted tools whose source is not currently registered (e.g. a
1068    /// detached MCP server) do not fail the restore: they are kept as orphaned
1069    /// non-members, listed in the returned [`ToolRestoreReport`], and rebind
1070    /// automatically when a source re-advertises the same tool.
1071    pub async fn restore_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
1072        self.control.restore_tool_state(state).await
1073    }
1074}
1075
1076#[derive(Clone)]
1077pub struct SessionCommandAdmin {
1078    control: SessionAdmin,
1079}
1080
1081impl SessionCommandAdmin {
1082    /// Enqueue an unconditional tool-catalog refresh. The command drains
1083    /// asynchronously and recomputes the surface from live sources, so it
1084    /// takes no generation guard — any generation observed at enqueue time
1085    /// could legitimately have advanced by drain time.
1086    pub async fn refresh_tool_catalog(
1087        &self,
1088        reason: impl Into<String>,
1089        idempotency_key: impl Into<String>,
1090    ) -> Result<lash_core::SessionCommandReceipt> {
1091        self.control
1092            .submit_session_command(
1093                lash_core::SessionCommand::RefreshToolCatalog {
1094                    reason: reason.into(),
1095                },
1096                idempotency_key,
1097            )
1098            .await
1099    }
1100
1101    pub async fn reset(
1102        &self,
1103        reason: impl Into<String>,
1104        idempotency_key: impl Into<String>,
1105    ) -> Result<lash_core::SessionCommandReceipt> {
1106        self.control
1107            .submit_session_command(
1108                lash_core::SessionCommand::ResetSession {
1109                    reason: reason.into(),
1110                },
1111                idempotency_key,
1112            )
1113            .await
1114    }
1115}
1116
1117/// Session-scoped read controls for Lashlang trigger registrations.
1118#[derive(Clone)]
1119pub struct SessionTriggerAdmin {
1120    control: SessionAdmin,
1121}
1122
1123impl SessionTriggerAdmin {
1124    /// Return every trigger registration in the session.
1125    ///
1126    /// This is an admin/introspection view. Source owners should prefer
1127    /// [`Self::by_source_type`] so they only inspect registrations for the
1128    /// concrete source type they own.
1129    pub async fn list_all(&self) -> Result<Vec<lash_core::TriggerRegistration>> {
1130        self.control.list_trigger_registrations().await
1131    }
1132
1133    /// Return registrations whose source value has the given host descriptor type.
1134    ///
1135    /// This is the source-owner API: a timer, UI, webhook, or other host-owned
1136    /// source uses it to inspect registrations for keys it may schedule and emit.
1137    pub async fn by_source_type(
1138        &self,
1139        source_type: impl Into<lash_core::TriggerEventType>,
1140    ) -> Result<Vec<lash_core::TriggerRegistration>> {
1141        self.control
1142            .trigger_registrations_by_source_type(source_type)
1143            .await
1144    }
1145}
1146
1147#[derive(Clone)]
1148pub struct SessionProcessAdmin {
1149    control: SessionAdmin,
1150}
1151
1152impl SessionProcessAdmin {
1153    pub(crate) fn new(control: SessionAdmin) -> Self {
1154        Self { control }
1155    }
1156
1157    pub async fn start(
1158        &self,
1159        request: lash_core::ProcessStartRequest,
1160        scoped_effect_controller: ScopedEffectController<'_>,
1161    ) -> Result<lash_core::ProcessHandleSummary> {
1162        self.control
1163            .start_process(request, scoped_effect_controller)
1164            .await
1165    }
1166
1167    pub async fn list(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
1168        self.control.list_process_handles().await
1169    }
1170
1171    pub async fn list_all(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
1172        self.control.list_all_process_handles().await
1173    }
1174
1175    pub async fn await_all(&self) -> Result<()> {
1176        self.control.await_background_work().await
1177    }
1178
1179    pub async fn cancel(
1180        &self,
1181        process_id: &str,
1182        scoped_effect_controller: ScopedEffectController<'_>,
1183    ) -> Result<lash_core::ProcessCancelSummary> {
1184        self.control
1185            .cancel_process(process_id, scoped_effect_controller)
1186            .await
1187    }
1188
1189    pub async fn cancel_all(
1190        &self,
1191        scoped_effect_controller: ScopedEffectController<'_>,
1192    ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
1193        self.control
1194            .cancel_visible_processes(scoped_effect_controller)
1195            .await
1196    }
1197}
1198
1199#[derive(Clone)]
1200pub struct SessionStateAdmin {
1201    control: SessionAdmin,
1202}
1203
1204impl SessionStateAdmin {
1205    pub async fn export(&self) -> lash_core::SessionSnapshot {
1206        self.control.export_state().await
1207    }
1208
1209    pub async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
1210        self.control.append_messages(messages).await
1211    }
1212
1213    pub async fn append_plugin_body(
1214        &self,
1215        plugin_type: impl Into<String>,
1216        body: serde_json::Value,
1217    ) -> Result<()> {
1218        self.control.append_plugin_body(plugin_type, body).await
1219    }
1220
1221    pub async fn set_persisted(&self, state: RuntimeSessionState) -> Result<()> {
1222        self.control.set_persisted_state(state).await
1223    }
1224
1225    pub async fn branch_to_node(
1226        &self,
1227        target_leaf: Option<String>,
1228    ) -> Result<lash_core::SessionSnapshot> {
1229        self.control.branch_to_node(target_leaf).await
1230    }
1231
1232    pub async fn persist_current(&self) -> Result<RuntimeSessionState> {
1233        self.control.persist_current_state().await
1234    }
1235
1236    pub async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
1237        self.control.session_state_service().await
1238    }
1239
1240    pub async fn snapshot_execution(&self) -> Result<Option<Vec<u8>>> {
1241        self.control.snapshot_execution_state().await
1242    }
1243
1244    pub async fn restore_execution(&self, bytes: &[u8]) -> Result<()> {
1245        self.control.restore_execution_state(bytes).await
1246    }
1247
1248    pub async fn compact_context(
1249        &self,
1250        instructions: Option<String>,
1251        scoped_effect_controller: ScopedEffectController<'_>,
1252    ) -> Result<bool> {
1253        self.control
1254            .compact_context(instructions, scoped_effect_controller)
1255            .await
1256    }
1257}
1258
1259#[derive(Clone)]
1260pub struct PluginOperations {
1261    pub(crate) control: SessionAdmin,
1262}
1263
1264impl PluginOperations {
1265    pub async fn query<Op: lash_core::PluginQuery>(&self, args: Op::Args) -> Result<Op::Output> {
1266        let (_plugin_id, output) = self
1267            .control
1268            .query_plugin_raw(Op::NAME, encode_plugin_args::<Op>(args)?)
1269            .await?;
1270        decode_plugin_output::<Op>(output)
1271    }
1272
1273    pub async fn query_raw(
1274        &self,
1275        name: &str,
1276        args: serde_json::Value,
1277    ) -> Result<(String, serde_json::Value)> {
1278        self.control.query_plugin_raw(name, args).await
1279    }
1280
1281    pub async fn run_command<Op: lash_core::PluginCommand>(
1282        &self,
1283        args: Op::Args,
1284    ) -> Result<lash_core::PluginCommandReceipt<Op::Output>> {
1285        let receipt = self
1286            .control
1287            .run_plugin_command_raw(Op::NAME, encode_plugin_args::<Op>(args)?)
1288            .await?;
1289        Ok(lash_core::PluginCommandReceipt {
1290            output: decode_plugin_output::<Op>(receipt.output)?,
1291            events: receipt.events,
1292            queued_batches: receipt.queued_batches,
1293        })
1294    }
1295
1296    pub async fn run_command_raw(
1297        &self,
1298        name: &str,
1299        args: serde_json::Value,
1300    ) -> Result<lash_core::PluginCommandReceipt<serde_json::Value>> {
1301        self.control.run_plugin_command_raw(name, args).await
1302    }
1303
1304    pub async fn run_task<Op: lash_core::PluginTask>(
1305        &self,
1306        args: Op::Args,
1307    ) -> Result<lash_core::PluginTaskReceipt<Op::Output>> {
1308        self.run_task_with_cancel::<Op>(args, CancellationToken::new())
1309            .await
1310    }
1311
1312    pub async fn run_task_with_cancel<Op: lash_core::PluginTask>(
1313        &self,
1314        args: Op::Args,
1315        cancellation_token: CancellationToken,
1316    ) -> Result<lash_core::PluginTaskReceipt<Op::Output>> {
1317        let receipt = self
1318            .control
1319            .run_plugin_task_raw_with_cancel(
1320                Op::NAME,
1321                encode_plugin_args::<Op>(args)?,
1322                cancellation_token,
1323            )
1324            .await?;
1325        Ok(lash_core::PluginTaskReceipt {
1326            output: decode_plugin_output::<Op>(receipt.output)?,
1327            events: receipt.events,
1328            queued_batches: receipt.queued_batches,
1329        })
1330    }
1331
1332    pub async fn run_task_raw(
1333        &self,
1334        name: &str,
1335        args: serde_json::Value,
1336    ) -> Result<lash_core::PluginTaskReceipt<serde_json::Value>> {
1337        self.run_task_raw_with_cancel(name, args, CancellationToken::new())
1338            .await
1339    }
1340
1341    pub async fn run_task_raw_with_cancel(
1342        &self,
1343        name: &str,
1344        args: serde_json::Value,
1345        cancellation_token: CancellationToken,
1346    ) -> Result<lash_core::PluginTaskReceipt<serde_json::Value>> {
1347        self.control
1348            .run_plugin_task_raw_with_cancel(name, args, cancellation_token)
1349            .await
1350    }
1351}
1352
1353fn encode_plugin_args<Op: lash_core::PluginOperation>(args: Op::Args) -> Result<serde_json::Value> {
1354    serde_json::to_value(args).map_err(|err| {
1355        EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
1356            "invalid {} args: {err}",
1357            Op::NAME
1358        )))
1359    })
1360}
1361
1362fn decode_plugin_output<Op: lash_core::PluginOperation>(
1363    output: serde_json::Value,
1364) -> Result<Op::Output> {
1365    serde_json::from_value(output).map_err(|err| {
1366        EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
1367            "invalid {} output: {err}",
1368            Op::NAME
1369        )))
1370    })
1371}
1372
1373#[derive(Clone)]
1374pub struct ChildSessionAdmin {
1375    control: SessionAdmin,
1376}
1377
1378impl ChildSessionAdmin {
1379    pub async fn create_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
1380        self.control.create_child_session(request).await
1381    }
1382
1383    pub async fn close_session(&self, session_id: &str) -> Result<()> {
1384        self.control.close_child_session(session_id).await
1385    }
1386
1387    pub async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
1388        self.control.activate_managed_session(session_id).await
1389    }
1390}
1391
1392#[derive(Clone)]
1393pub struct InjectionAdmin {
1394    control: SessionAdmin,
1395}
1396
1397impl InjectionAdmin {
1398    pub async fn inject_turn_input(
1399        &self,
1400        id: Option<String>,
1401        message: PluginMessage,
1402    ) -> Result<()> {
1403        self.control.inject_turn_input(id, message).await
1404    }
1405
1406    pub async fn inject_turn_inputs(
1407        &self,
1408        messages: Vec<lash_core::InjectedTurnInput>,
1409    ) -> Result<()> {
1410        self.control.inject_turn_inputs(messages).await
1411    }
1412}
1413
1414#[derive(Clone)]
1415pub struct ProtocolAdmin {
1416    control: SessionAdmin,
1417}
1418
1419impl ProtocolAdmin {
1420    pub async fn apply_session_extension(
1421        &self,
1422        extension: lash_core::ProtocolSessionExtensionHandle,
1423    ) -> Result<()> {
1424        self.control
1425            .apply_protocol_session_extension(extension)
1426            .await
1427    }
1428}