Skip to main content

lash/
control.rs

1pub use crate::session::SessionConfigPatch;
2use crate::support::*;
3pub use lash_core::{AcceptedInjectedTurnInput, PluginAction};
4
5#[derive(Clone)]
6pub struct HostEventsControl {
7    pub(crate) core: LashCore,
8}
9
10impl HostEventsControl {
11    pub async fn emit(
12        &self,
13        request: lash_core::HostEventOccurrenceRequest,
14        scoped_effect_controller: ScopedEffectController<'_>,
15    ) -> Result<lash_core::HostEventEmitReport> {
16        let store = self.core.env.host_event_store.as_ref().ok_or_else(|| {
17            EmbedError::Plugin(lash_core::PluginError::Session(
18                "host event store is unavailable in this runtime".to_string(),
19            ))
20        })?;
21        let process_work_poke = self.core.process_work_runner.poke().await;
22        let router = lash_core::HostEventRouter::new(
23            Arc::clone(store),
24            Arc::clone(&self.core.env.core.durability.lashlang_artifact_store),
25            self.core.env.process_registry.clone(),
26            process_work_poke,
27            self.core.env.core.profile.host_profile_id.clone(),
28        );
29        router
30            .emit(request, scoped_effect_controller.controller())
31            .await
32            .map_err(Into::into)
33    }
34
35    pub async fn subscriptions(
36        &self,
37        filter: lash_core::TriggerSubscriptionFilter,
38    ) -> Result<Vec<lash_core::TriggerRegistration>> {
39        let store = self.core.env.host_event_store.as_ref().ok_or_else(|| {
40            EmbedError::Plugin(lash_core::PluginError::Session(
41                "host event store is unavailable in this runtime".to_string(),
42            ))
43        })?;
44        let records = store.list_subscriptions(filter).await?;
45        Ok(records
46            .iter()
47            .map(lash_core::TriggerRegistration::from)
48            .collect())
49    }
50}
51
52#[derive(Clone)]
53pub struct Processes {
54    pub(crate) core: LashCore,
55}
56
57impl Processes {
58    fn registry(&self) -> Result<Arc<dyn lash_core::ProcessRegistry>> {
59        self.core
60            .env
61            .process_registry
62            .as_ref()
63            .cloned()
64            .ok_or_else(|| {
65                EmbedError::Plugin(lash_core::PluginError::Session(
66                    "process registry is unavailable in this runtime".to_string(),
67                ))
68            })
69    }
70
71    fn make_observer(&self) -> Result<lash_core::ProcessWorkObserver> {
72        Ok(lash_core::ProcessWorkObserver::new(self.registry()?))
73    }
74
75    fn process_invocation(command: &lash_core::ProcessCommand) -> lash_core::RuntimeInvocation {
76        let effect_id = command.effect_id();
77        lash_core::RuntimeInvocation::effect(
78            lash_core::runtime::RuntimeScope::new("runtime"),
79            effect_id.clone(),
80            lash_core::RuntimeEffectKind::Process,
81            effect_id,
82        )
83    }
84
85    async fn run_command(
86        &self,
87        command: lash_core::ProcessCommand,
88        scoped_effect_controller: ScopedEffectController<'_>,
89    ) -> Result<lash_core::ProcessEffectOutcome> {
90        let registry = self.registry()?;
91        let invocation = Self::process_invocation(&command);
92        let outcome = scoped_effect_controller
93            .controller()
94            .execute_effect(
95                lash_core::RuntimeEffectEnvelope::new(
96                    invocation,
97                    lash_core::RuntimeEffectCommand::process(command),
98                ),
99                lash_core::RuntimeEffectLocalExecutor::process_control(registry),
100            )
101            .await
102            .map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))?;
103        match outcome {
104            lash_core::RuntimeEffectOutcome::Process { result } => Ok(result),
105            _ => Err(EmbedError::Plugin(lash_core::PluginError::Session(
106                "process effect returned non-process outcome".to_string(),
107            ))),
108        }
109    }
110
111    pub async fn start(
112        &self,
113        request: lash_core::ProcessStartRequest,
114        scoped_effect_controller: ScopedEffectController<'_>,
115    ) -> Result<lash_core::ProcessRecord> {
116        let env_ref = match request.env_spec.as_ref() {
117            Some(env_spec) => Some(
118                lash_core::runtime::persist_process_execution_env(
119                    self.core
120                        .env
121                        .core
122                        .durability
123                        .lashlang_artifact_store
124                        .as_ref(),
125                    env_spec,
126                )
127                .await?,
128            ),
129            None => None,
130        };
131        let grant = request.grant.clone();
132        let registration =
133            request.into_registration(self.core.env.core.profile.host_profile_id.clone(), env_ref);
134        let command = lash_core::ProcessCommand::Start {
135            registration,
136            grant,
137            execution_context: Box::new(lash_core::ProcessExecutionContext::default()),
138        };
139        let outcome = self.run_command(command, scoped_effect_controller).await?;
140        let lash_core::ProcessEffectOutcome::Start { record } = outcome else {
141            return Err(EmbedError::Plugin(lash_core::PluginError::Session(
142                "process start returned the wrong outcome".to_string(),
143            )));
144        };
145        if let Some(poke) = self.core.process_work_runner.poke().await {
146            poke.poke();
147        }
148        Ok(record)
149    }
150
151    pub async fn list(
152        &self,
153        filter: &lash_core::ProcessListFilter,
154    ) -> Result<Vec<lash_core::ObservedProcess>> {
155        self.make_observer()?.list(filter).await.map_err(Into::into)
156    }
157
158    pub async fn get(&self, process_id: &str) -> Result<Option<lash_core::ObservedProcess>> {
159        Ok(self.make_observer()?.process(process_id).await)
160    }
161
162    pub async fn events(
163        &self,
164        process_id: &str,
165        after_sequence: u64,
166    ) -> Result<Vec<lash_core::ObservedProcessEvent>> {
167        self.make_observer()?
168            .events_after(process_id, after_sequence)
169            .await
170            .map_err(Into::into)
171    }
172
173    pub async fn await_output(&self, process_id: &str) -> Result<lash_core::ProcessAwaitOutput> {
174        self.registry()?
175            .await_process(process_id)
176            .await
177            .map_err(Into::into)
178    }
179
180    pub async fn cancel(
181        &self,
182        process_id: &str,
183        scoped_effect_controller: ScopedEffectController<'_>,
184    ) -> Result<lash_core::ProcessCancelSummary> {
185        let command = lash_core::ProcessCommand::Cancel {
186            process_id: process_id.to_string(),
187            reason: Some("requested by host".to_string()),
188        };
189        let outcome = self.run_command(command, scoped_effect_controller).await?;
190        let lash_core::ProcessEffectOutcome::Cancel { record } = outcome else {
191            return Err(EmbedError::Plugin(lash_core::PluginError::Session(
192                "process cancel returned the wrong outcome".to_string(),
193            )));
194        };
195        Ok(lash_core::ProcessCancelSummary::from_record(record))
196    }
197
198    pub async fn signal(
199        &self,
200        process_id: &str,
201        signal_name: impl Into<String>,
202        signal_id: impl Into<String>,
203        request: lash_core::ProcessEventAppendRequest,
204        scoped_effect_controller: ScopedEffectController<'_>,
205    ) -> Result<lash_core::ProcessEvent> {
206        let command = lash_core::ProcessCommand::Signal {
207            process_id: process_id.to_string(),
208            signal_name: signal_name.into(),
209            signal_id: signal_id.into(),
210            request,
211        };
212        let outcome = self.run_command(command, scoped_effect_controller).await?;
213        let lash_core::ProcessEffectOutcome::Signal { event } = outcome else {
214            return Err(EmbedError::Plugin(lash_core::PluginError::Session(
215                "process signal returned the wrong outcome".to_string(),
216            )));
217        };
218        Ok(event)
219    }
220
221    pub async fn session_snapshot(
222        &self,
223        session_id: impl Into<String>,
224    ) -> Result<lash_core::ProcessWorkSnapshot> {
225        self.make_observer()?
226            .snapshot_for_session(session_id)
227            .await
228            .map_err(Into::into)
229    }
230
231    pub fn observer(&self) -> Result<lash_core::ProcessWorkObserver> {
232        self.make_observer()
233    }
234}
235
236#[derive(Clone)]
237pub struct SessionControl {
238    pub(crate) runtime: RuntimeHandle,
239}
240
241impl SessionControl {
242    pub fn config(&self) -> ConfigControl {
243        ConfigControl {
244            control: self.clone(),
245        }
246    }
247
248    pub fn tools(&self) -> ToolsControl {
249        ToolsControl {
250            control: self.clone(),
251        }
252    }
253
254    pub fn commands(&self) -> SessionCommandsControl {
255        SessionCommandsControl {
256            control: self.clone(),
257        }
258    }
259
260    pub fn triggers(&self) -> TriggersControl {
261        TriggersControl {
262            control: self.clone(),
263        }
264    }
265
266    pub fn state(&self) -> StateControl {
267        StateControl {
268            control: self.clone(),
269        }
270    }
271
272    pub fn children(&self) -> ChildrenControl {
273        ChildrenControl {
274            control: self.clone(),
275        }
276    }
277
278    pub fn injection(&self) -> InjectionControl {
279        InjectionControl {
280            control: self.clone(),
281        }
282    }
283
284    pub fn mode(&self) -> ModeControl {
285        ModeControl {
286            control: self.clone(),
287        }
288    }
289
290    pub fn processes(&self) -> ProcessControl {
291        ProcessControl {
292            control: self.clone(),
293        }
294    }
295
296    /// Run `f` against the locked runtime writer, then publish the resulting
297    /// observation. The body is the canonical `lock → call → publish_from`
298    /// stamp shared by nearly every mutating control method; publish happens
299    /// unconditionally once the closure returns.
300    async fn with_writer<F, T>(&self, f: F) -> T
301    where
302        F: AsyncFnOnce(&mut LashRuntime) -> T,
303    {
304        let writer = self.runtime.writer();
305        let mut runtime = writer.lock().await;
306        let value = f(&mut runtime).await;
307        self.runtime.publish_from(&runtime);
308        value
309    }
310
311    async fn update_config(&self, patch: SessionConfigPatch) -> Result<()> {
312        self.update_session_config(patch.provider, patch.model, patch.prompt)
313            .await?;
314        Ok(())
315    }
316
317    async fn update_session_config(
318        &self,
319        provider: Option<ProviderHandle>,
320        model: Option<lash_core::ModelSpec>,
321        prompt: Option<PromptLayer>,
322    ) -> Result<()> {
323        self.with_writer(async |runtime: &mut LashRuntime| {
324            runtime.update_session_config(provider, model, prompt).await;
325        })
326        .await;
327        Ok(())
328    }
329
330    async fn export_state(&self) -> lash_core::SessionSnapshot {
331        self.runtime.observe().read_view.to_snapshot()
332    }
333
334    async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
335        self.with_writer(async |runtime: &mut LashRuntime| {
336            runtime
337                .append_session_nodes(lash_core::AppendSessionNodesRequest {
338                    nodes: messages
339                        .into_iter()
340                        .map(lash_core::SessionAppendNode::message)
341                        .collect(),
342                    requires_ancestor_node_id: None,
343                })
344                .await
345                .map(|_| ())
346                .map_err(Into::into)
347        })
348        .await
349    }
350
351    async fn append_plugin_body(
352        &self,
353        plugin_type: impl Into<String>,
354        body: serde_json::Value,
355    ) -> Result<()> {
356        self.with_writer(async |runtime: &mut LashRuntime| {
357            runtime
358                .append_session_nodes(lash_core::AppendSessionNodesRequest {
359                    nodes: vec![lash_core::SessionAppendNode::plugin(plugin_type, body)],
360                    requires_ancestor_node_id: None,
361                })
362                .await
363                .map(|_| ())
364                .map_err(Into::into)
365        })
366        .await
367    }
368
369    async fn set_persisted_state(&self, state: RuntimeSessionState) -> Result<()> {
370        self.with_writer(async |runtime: &mut LashRuntime| {
371            runtime.set_persisted_state(state).map_err(Into::into)
372        })
373        .await
374    }
375
376    async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
377        self.with_writer(async |runtime: &mut LashRuntime| {
378            runtime.set_prompt_template(template).await;
379        })
380        .await;
381        Ok(())
382    }
383
384    async fn clear_prompt_template(&self) -> Result<()> {
385        self.with_writer(async |runtime: &mut LashRuntime| {
386            runtime.clear_prompt_template().await;
387        })
388        .await;
389        Ok(())
390    }
391
392    async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
393        self.with_writer(async |runtime: &mut LashRuntime| {
394            runtime.add_prompt_contribution(contribution).await;
395        })
396        .await;
397        Ok(())
398    }
399
400    async fn replace_prompt_slot(
401        &self,
402        slot: PromptSlot,
403        contributions: impl IntoIterator<Item = PromptContribution>,
404    ) -> Result<()> {
405        self.with_writer(async |runtime: &mut LashRuntime| {
406            runtime.replace_prompt_slot(slot, contributions).await;
407        })
408        .await;
409        Ok(())
410    }
411
412    async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
413        self.with_writer(async |runtime: &mut LashRuntime| {
414            runtime.clear_prompt_slot(slot).await;
415        })
416        .await;
417        Ok(())
418    }
419
420    async fn apply_protocol_session_extension(
421        &self,
422        extension: lash_core::ProtocolSessionExtensionHandle,
423    ) -> Result<()> {
424        self.with_writer(async |runtime: &mut LashRuntime| {
425            runtime
426                .apply_protocol_session_extension(extension)
427                .await
428                .map_err(Into::into)
429        })
430        .await
431    }
432
433    async fn branch_to_node(
434        &self,
435        target_leaf: Option<String>,
436    ) -> Result<lash_core::SessionSnapshot> {
437        self.with_writer(async |runtime: &mut LashRuntime| {
438            runtime
439                .branch_to_node(target_leaf)
440                .await
441                .map_err(Into::into)
442        })
443        .await
444    }
445
446    async fn await_background_work(&self) -> Result<()> {
447        self.with_writer(async |runtime: &mut LashRuntime| {
448            runtime.await_background_work().await.map_err(Into::into)
449        })
450        .await
451    }
452
453    async fn refresh_tool_surface(&self) -> Result<()> {
454        self.with_writer(async |runtime: &mut LashRuntime| {
455            runtime
456                .refresh_session_tool_surface()
457                .await
458                .map_err(Into::into)
459        })
460        .await
461    }
462
463    async fn submit_session_command(
464        &self,
465        command: lash_core::SessionCommand,
466        idempotency_key: impl Into<String>,
467    ) -> Result<lash_core::SessionCommandReceipt> {
468        let idempotency_key = idempotency_key.into();
469        self.with_writer(async |runtime: &mut LashRuntime| {
470            runtime
471                .submit_session_command(command, idempotency_key)
472                .await
473                .map_err(Into::into)
474        })
475        .await
476    }
477
478    async fn list_lashlang_trigger_registrations(
479        &self,
480    ) -> Result<Vec<lash_core::TriggerRegistration>> {
481        self.with_writer(async |runtime: &mut LashRuntime| {
482            runtime
483                .list_lashlang_trigger_registrations()
484                .await
485                .map_err(Into::into)
486        })
487        .await
488    }
489
490    async fn lashlang_trigger_registrations_by_source_type(
491        &self,
492        source_type: impl Into<lash_core::TriggerSourceType>,
493    ) -> Result<Vec<lash_core::TriggerRegistration>> {
494        self.with_writer(async |runtime: &mut LashRuntime| {
495            runtime
496                .lashlang_trigger_registrations_by_source_type(source_type)
497                .await
498                .map_err(Into::into)
499        })
500        .await
501    }
502
503    async fn invoke_plugin_action(
504        &self,
505        name: &str,
506        args: serde_json::Value,
507    ) -> Result<ToolResult> {
508        let session_id = self.runtime.observe().session_id().to_string();
509        let writer = self.runtime.writer();
510        writer
511            .lock()
512            .await
513            .invoke_plugin_action(name, args, Some(session_id))
514            .await
515            .map_err(Into::into)
516    }
517
518    async fn call_plugin_action<Op: lash_core::PluginAction>(
519        &self,
520        args: Op::Args,
521    ) -> Result<Op::Output> {
522        let result = self
523            .invoke_plugin_action(
524                Op::NAME,
525                serde_json::to_value(args).map_err(|err| {
526                    EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
527                        "invalid {} args: {err}",
528                        Op::NAME
529                    )))
530                })?,
531            )
532            .await?;
533        if !result.is_success() {
534            return Err(EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
535                "{} failed: {}",
536                Op::NAME,
537                result.value_for_projection()
538            ))));
539        }
540        serde_json::from_value(result.into_output().value_for_projection()).map_err(|err| {
541            EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
542                "invalid {} output: {err}",
543                Op::NAME
544            )))
545        })
546    }
547
548    async fn compact_context(
549        &self,
550        instructions: Option<String>,
551        scoped_effect_controller: ScopedEffectController<'_>,
552    ) -> Result<bool> {
553        self.with_writer(async |runtime: &mut LashRuntime| {
554            runtime
555                .compact_context(instructions, scoped_effect_controller)
556                .await
557                .map_err(Into::into)
558        })
559        .await
560    }
561
562    async fn persist_current_state(&self) -> Result<RuntimeSessionState> {
563        self.with_writer(async |runtime: &mut LashRuntime| {
564            runtime.await_background_work().await?;
565            Ok(runtime.export_persisted_state())
566        })
567        .await
568    }
569
570    async fn list_process_handles(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
571        Ok(self.runtime.observe().list_process_handles().await)
572    }
573
574    async fn list_all_process_handles(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
575        Ok(self.runtime.observe().list_all_process_handles().await)
576    }
577
578    async fn start_process(
579        &self,
580        request: lash_core::ProcessStartRequest,
581        scoped_effect_controller: ScopedEffectController<'_>,
582    ) -> Result<lash_core::ProcessHandleSummary> {
583        let writer = self.runtime.writer();
584        let runtime = writer.lock().await;
585        let session_id = runtime.session_id().to_string();
586        let processes = runtime.process_service()?;
587        let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
588        let summary = processes
589            .start_from_request(&session_id, request, scope)
590            .await
591            .map_err(EmbedError::Plugin)?;
592        self.runtime.record_process_changed(
593            SessionProcessEventKind::Started,
594            vec![summary.process_id.clone()],
595        );
596        Ok(summary)
597    }
598
599    async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
600        self.runtime
601            .writer()
602            .lock()
603            .await
604            .session_state_service()
605            .map_err(Into::into)
606    }
607
608    async fn cancel_process(
609        &self,
610        process_id: &str,
611        scoped_effect_controller: ScopedEffectController<'_>,
612    ) -> Result<lash_core::ProcessCancelSummary> {
613        let writer = self.runtime.writer();
614        let runtime = writer.lock().await;
615        let session_id = runtime.session_id().to_string();
616        let processes = runtime.process_service()?;
617        let cancel_ability = runtime.process_cancel_ability();
618        let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
619        let summary = cancel_ability
620            .cancel_summary(
621                processes.as_ref(),
622                lash_core::ProcessCancelRequest::new(
623                    &session_id,
624                    process_id,
625                    scope,
626                    lash_core::ProcessCancelSource::HostApi,
627                )
628                .with_reason("requested by host API"),
629            )
630            .await
631            .map_err(EmbedError::Plugin)?;
632        self.runtime.record_process_changed(
633            SessionProcessEventKind::Cancelled,
634            vec![summary.process_id.clone()],
635        );
636        Ok(summary)
637    }
638
639    async fn cancel_visible_processes(
640        &self,
641        scoped_effect_controller: ScopedEffectController<'_>,
642    ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
643        let writer = self.runtime.writer();
644        let runtime = writer.lock().await;
645        let session_id = runtime.session_id().to_string();
646        let processes = runtime.process_service()?;
647        let cancel_ability = runtime.process_cancel_ability();
648        let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
649        let summaries = cancel_ability
650            .cancel_all_visible(
651                processes.as_ref(),
652                lash_core::ProcessCancelAllRequest::new(
653                    &session_id,
654                    scope,
655                    lash_core::ProcessCancelSource::HostApi,
656                )
657                .with_reason("requested by host API"),
658            )
659            .await
660            .map_err(EmbedError::Plugin)?;
661        self.runtime.record_process_changed(
662            SessionProcessEventKind::Cancelled,
663            summaries
664                .iter()
665                .map(|summary| summary.process_id.clone())
666                .collect(),
667        );
668        Ok(summaries)
669    }
670
671    async fn snapshot_execution_state(&self) -> Result<Option<Vec<u8>>> {
672        self.with_writer(async |runtime: &mut LashRuntime| {
673            runtime.snapshot_execution_state().await.map_err(Into::into)
674        })
675        .await
676    }
677
678    async fn restore_execution_state(&self, bytes: &[u8]) -> Result<()> {
679        self.with_writer(async |runtime: &mut LashRuntime| {
680            runtime
681                .restore_execution_state(bytes)
682                .await
683                .map_err(Into::into)
684        })
685        .await
686    }
687
688    async fn tool_state(&self) -> Result<ToolState> {
689        self.runtime.observe().tool_state.clone().ok_or_else(|| {
690            EmbedError::Session(SessionError::Protocol(
691                "runtime session not available".to_string(),
692            ))
693        })
694    }
695
696    async fn apply_tool_state(&self, state: ToolState) -> Result<u64> {
697        self.with_writer(async |runtime: &mut LashRuntime| {
698            runtime
699                .apply_tool_state(state)
700                .await
701                .map_err(EmbedError::from)
702        })
703        .await
704    }
705
706    async fn restore_tool_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
707        self.with_writer(async |runtime: &mut LashRuntime| {
708            runtime
709                .restore_tool_state(state)
710                .await
711                .map_err(EmbedError::from)
712        })
713        .await
714    }
715
716    async fn set_tool_availability(
717        &self,
718        name: &str,
719        availability: ToolAvailability,
720    ) -> Result<u64> {
721        self.set_tool_availability_many(&[(name, availability)])
722            .await
723    }
724
725    async fn set_tool_availability_many<N: AsRef<str>>(
726        &self,
727        updates: &[(N, ToolAvailability)],
728    ) -> Result<u64> {
729        let mut state = self.tool_state().await?;
730        for (name, availability) in updates {
731            state
732                .set_availability(name.as_ref(), Some(*availability))
733                .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
734        }
735        self.apply_tool_state(state).await
736    }
737
738    async fn clear_tool_availability_override(&self, name: &str) -> Result<u64> {
739        let mut state = self.tool_state().await?;
740        state
741            .set_availability(name, None)
742            .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
743        self.apply_tool_state(state).await
744    }
745
746    async fn active_tool_manifests(&self) -> Result<Vec<ToolManifest>> {
747        Ok(self.tool_state().await?.tool_manifests())
748    }
749
750    async fn add_tool_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
751        let tool_registry = self.tool_registry().await?;
752        let handle = tool_registry
753            .add_tool_provider(provider)
754            .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
755        self.refresh_tool_surface().await?;
756        Ok(handle)
757    }
758
759    async fn remove_tool_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
760        let tool_registry = self.tool_registry().await?;
761        let generation = tool_registry
762            .remove_source(handle)
763            .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
764        self.refresh_tool_surface().await?;
765        Ok(generation)
766    }
767
768    async fn create_child_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
769        let writer = self.runtime.writer();
770        let runtime = writer.lock().await;
771        let lifecycle = runtime.session_lifecycle_service()?;
772        lifecycle.create_session(request).await.map_err(Into::into)
773    }
774
775    async fn close_child_session(&self, session_id: &str) -> Result<()> {
776        let writer = self.runtime.writer();
777        let runtime = writer.lock().await;
778        let lifecycle = runtime.session_lifecycle_service()?;
779        lifecycle
780            .close_session(session_id)
781            .await
782            .map_err(Into::into)
783    }
784
785    async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
786        self.with_writer(async |runtime: &mut LashRuntime| {
787            runtime
788                .activate_managed_session(session_id)
789                .await
790                .map_err(Into::into)
791        })
792        .await
793    }
794
795    async fn inject_turn_input(&self, id: Option<String>, message: PluginMessage) -> Result<()> {
796        self.inject_turn_inputs(vec![lash_core::InjectedTurnInput { id, message }])
797            .await
798    }
799
800    async fn inject_turn_inputs(&self, messages: Vec<lash_core::InjectedTurnInput>) -> Result<()> {
801        for input in messages {
802            let source_key = input.id.map(|id| format!("injection:{id}"));
803            let turn_input = turn_input_from_plugin_message(input.message);
804            self.runtime
805                .enqueue_turn_input(
806                    turn_input,
807                    lash_core::DeliveryPolicy::EarliestSafeBoundary,
808                    lash_core::SlotPolicy::Join,
809                    source_key,
810                )
811                .await
812                .map(|_| ())
813                .map_err(EmbedError::Runtime)?;
814        }
815        Ok(())
816    }
817
818    async fn tool_registry(&self) -> Result<Arc<lash_core::ToolRegistry>> {
819        self.runtime
820            .writer()
821            .lock()
822            .await
823            .plugin_session()
824            .map(|session| session.tool_registry())
825            .ok_or_else(|| {
826                EmbedError::Session(SessionError::Protocol(
827                    "tool registry is unavailable in this runtime session".to_string(),
828                ))
829            })
830    }
831}
832
833fn turn_input_from_plugin_message(message: PluginMessage) -> TurnInput {
834    let mut input = TurnInput::empty();
835    if !message.content.is_empty() {
836        input.items.push(InputItem::Text {
837            text: message.content,
838        });
839    }
840    for (index, bytes) in message.images.into_iter().enumerate() {
841        let id = format!("injected-image-{index}");
842        input.items.push(InputItem::ImageRef { id: id.clone() });
843        input.image_blobs.insert(id, bytes);
844    }
845    input
846}
847
848#[derive(Clone)]
849pub struct ConfigControl {
850    control: SessionControl,
851}
852
853impl ConfigControl {
854    pub async fn update(&self, patch: SessionConfigPatch) -> Result<()> {
855        self.control.update_config(patch).await
856    }
857
858    pub async fn update_session_config(
859        &self,
860        provider: Option<ProviderHandle>,
861        model: Option<lash_core::ModelSpec>,
862        prompt: Option<PromptLayer>,
863    ) -> Result<()> {
864        self.control
865            .update_session_config(provider, model, prompt)
866            .await
867    }
868
869    pub async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
870        self.control.set_prompt_template(template).await
871    }
872
873    pub async fn clear_prompt_template(&self) -> Result<()> {
874        self.control.clear_prompt_template().await
875    }
876
877    pub async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
878        self.control.add_prompt_contribution(contribution).await
879    }
880
881    pub async fn replace_prompt_slot(
882        &self,
883        slot: PromptSlot,
884        contributions: impl IntoIterator<Item = PromptContribution>,
885    ) -> Result<()> {
886        self.control.replace_prompt_slot(slot, contributions).await
887    }
888
889    pub async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
890        self.control.clear_prompt_slot(slot).await
891    }
892}
893
894#[derive(Clone)]
895pub struct ToolsControl {
896    control: SessionControl,
897}
898
899impl ToolsControl {
900    pub(crate) fn new(control: SessionControl) -> Self {
901        Self { control }
902    }
903}
904
905impl ToolsControl {
906    pub async fn state(&self) -> Result<ToolState> {
907        self.control.tool_state().await
908    }
909
910    pub fn advanced(&self) -> AdvancedToolsControl {
911        AdvancedToolsControl {
912            control: self.control.clone(),
913        }
914    }
915
916    pub async fn set_availability(
917        &self,
918        name: impl AsRef<str>,
919        availability: ToolAvailability,
920    ) -> Result<u64> {
921        self.control
922            .set_tool_availability(name.as_ref(), availability)
923            .await
924    }
925
926    pub async fn set_availability_many<N: AsRef<str>>(
927        &self,
928        updates: &[(N, ToolAvailability)],
929    ) -> Result<u64> {
930        self.control.set_tool_availability_many(updates).await
931    }
932
933    pub async fn clear_availability_override(&self, name: impl AsRef<str>) -> Result<u64> {
934        self.control
935            .clear_tool_availability_override(name.as_ref())
936            .await
937    }
938
939    pub async fn active_manifests(&self) -> Result<Vec<ToolManifest>> {
940        self.control.active_tool_manifests().await
941    }
942
943    pub async fn add_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
944        self.control.add_tool_provider(provider).await
945    }
946
947    pub async fn remove_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
948        self.control.remove_tool_source(handle).await
949    }
950}
951
952#[derive(Clone)]
953pub struct AdvancedToolsControl {
954    control: SessionControl,
955}
956
957impl AdvancedToolsControl {
958    /// Replace the entire tool-state snapshot.
959    ///
960    /// This is a generation-checked escape hatch for hosts that intentionally
961    /// edit the full snapshot. Prefer `ToolsControl` availability methods for
962    /// ordinary tool policy changes.
963    pub async fn apply_state(&self, state: ToolState) -> Result<u64> {
964        self.control.apply_tool_state(state).await
965    }
966
967    /// Restore a persisted tool-state snapshot, adopting its generation.
968    ///
969    /// Use this when re-applying a snapshot read from durable storage (session
970    /// resume), not an edited delta: it reconstructs the exact persisted surface
971    /// idempotently rather than requiring the snapshot to match the current
972    /// generation. A cold resume of a session whose surface reached generation
973    /// ≥ 2 needs this — [`apply_state`](Self::apply_state) would reject it.
974    ///
975    /// Persisted tools whose source is not currently registered (e.g. a
976    /// detached MCP server) do not fail the restore: they are kept as orphans,
977    /// forced `Off`, listed in the returned [`ToolRestoreReport`], and rebind
978    /// automatically when a source re-advertises the same tool.
979    pub async fn restore_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
980        self.control.restore_tool_state(state).await
981    }
982}
983
984#[derive(Clone)]
985pub struct SessionCommandsControl {
986    control: SessionControl,
987}
988
989impl SessionCommandsControl {
990    /// Enqueue an unconditional tool-surface refresh. The command drains
991    /// asynchronously and recomputes the surface from live sources, so it
992    /// takes no generation guard — any generation observed at enqueue time
993    /// could legitimately have advanced by drain time.
994    pub async fn refresh_tool_surface(
995        &self,
996        reason: impl Into<String>,
997        idempotency_key: impl Into<String>,
998    ) -> Result<lash_core::SessionCommandReceipt> {
999        self.control
1000            .submit_session_command(
1001                lash_core::SessionCommand::RefreshToolSurface {
1002                    reason: reason.into(),
1003                },
1004                idempotency_key,
1005            )
1006            .await
1007    }
1008
1009    pub async fn reset(
1010        &self,
1011        reason: impl Into<String>,
1012        idempotency_key: impl Into<String>,
1013    ) -> Result<lash_core::SessionCommandReceipt> {
1014        self.control
1015            .submit_session_command(
1016                lash_core::SessionCommand::ResetSession {
1017                    reason: reason.into(),
1018                },
1019                idempotency_key,
1020            )
1021            .await
1022    }
1023}
1024
1025/// Session-scoped read controls for Lashlang trigger registrations.
1026#[derive(Clone)]
1027pub struct TriggersControl {
1028    control: SessionControl,
1029}
1030
1031impl TriggersControl {
1032    /// Return every trigger registration in the session.
1033    ///
1034    /// This is an admin/introspection view. Source owners should prefer
1035    /// [`Self::by_source_type`] so they only inspect registrations for the
1036    /// concrete source type they own.
1037    pub async fn list_all(&self) -> Result<Vec<lash_core::TriggerRegistration>> {
1038        self.control.list_lashlang_trigger_registrations().await
1039    }
1040
1041    /// Return registrations whose source value has the given host value type.
1042    ///
1043    /// This is the source-owner API: a timer, UI, webhook, or other host-owned
1044    /// source uses it to inspect registrations for keys it may schedule and emit.
1045    pub async fn by_source_type(
1046        &self,
1047        source_type: impl Into<lash_core::TriggerSourceType>,
1048    ) -> Result<Vec<lash_core::TriggerRegistration>> {
1049        self.control
1050            .lashlang_trigger_registrations_by_source_type(source_type)
1051            .await
1052    }
1053}
1054
1055#[derive(Clone)]
1056pub struct ProcessControl {
1057    control: SessionControl,
1058}
1059
1060impl ProcessControl {
1061    pub(crate) fn new(control: SessionControl) -> Self {
1062        Self { control }
1063    }
1064
1065    pub async fn start(
1066        &self,
1067        request: lash_core::ProcessStartRequest,
1068        scoped_effect_controller: ScopedEffectController<'_>,
1069    ) -> Result<lash_core::ProcessHandleSummary> {
1070        self.control
1071            .start_process(request, scoped_effect_controller)
1072            .await
1073    }
1074
1075    pub async fn list(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
1076        self.control.list_process_handles().await
1077    }
1078
1079    pub async fn list_all(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
1080        self.control.list_all_process_handles().await
1081    }
1082
1083    pub async fn await_all(&self) -> Result<()> {
1084        self.control.await_background_work().await
1085    }
1086
1087    pub async fn cancel(
1088        &self,
1089        process_id: &str,
1090        scoped_effect_controller: ScopedEffectController<'_>,
1091    ) -> Result<lash_core::ProcessCancelSummary> {
1092        self.control
1093            .cancel_process(process_id, scoped_effect_controller)
1094            .await
1095    }
1096
1097    pub async fn cancel_all(
1098        &self,
1099        scoped_effect_controller: ScopedEffectController<'_>,
1100    ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
1101        self.control
1102            .cancel_visible_processes(scoped_effect_controller)
1103            .await
1104    }
1105}
1106
1107#[derive(Clone)]
1108pub struct StateControl {
1109    control: SessionControl,
1110}
1111
1112impl StateControl {
1113    pub async fn export(&self) -> lash_core::SessionSnapshot {
1114        self.control.export_state().await
1115    }
1116
1117    pub async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
1118        self.control.append_messages(messages).await
1119    }
1120
1121    pub async fn append_plugin_body(
1122        &self,
1123        plugin_type: impl Into<String>,
1124        body: serde_json::Value,
1125    ) -> Result<()> {
1126        self.control.append_plugin_body(plugin_type, body).await
1127    }
1128
1129    pub async fn set_persisted(&self, state: RuntimeSessionState) -> Result<()> {
1130        self.control.set_persisted_state(state).await
1131    }
1132
1133    pub async fn branch_to_node(
1134        &self,
1135        target_leaf: Option<String>,
1136    ) -> Result<lash_core::SessionSnapshot> {
1137        self.control.branch_to_node(target_leaf).await
1138    }
1139
1140    pub async fn persist_current(&self) -> Result<RuntimeSessionState> {
1141        self.control.persist_current_state().await
1142    }
1143
1144    pub async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
1145        self.control.session_state_service().await
1146    }
1147
1148    pub async fn snapshot_execution(&self) -> Result<Option<Vec<u8>>> {
1149        self.control.snapshot_execution_state().await
1150    }
1151
1152    pub async fn restore_execution(&self, bytes: &[u8]) -> Result<()> {
1153        self.control.restore_execution_state(bytes).await
1154    }
1155
1156    pub async fn compact_context(
1157        &self,
1158        instructions: Option<String>,
1159        scoped_effect_controller: ScopedEffectController<'_>,
1160    ) -> Result<bool> {
1161        self.control
1162            .compact_context(instructions, scoped_effect_controller)
1163            .await
1164    }
1165}
1166
1167#[derive(Clone)]
1168pub struct PluginActions {
1169    pub(crate) control: SessionControl,
1170}
1171
1172impl PluginActions {
1173    pub async fn call<Op: lash_core::PluginAction>(&self, args: Op::Args) -> Result<Op::Output> {
1174        self.control.call_plugin_action::<Op>(args).await
1175    }
1176}
1177
1178#[derive(Clone)]
1179pub struct ChildrenControl {
1180    control: SessionControl,
1181}
1182
1183impl ChildrenControl {
1184    pub async fn create_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
1185        self.control.create_child_session(request).await
1186    }
1187
1188    pub async fn close_session(&self, session_id: &str) -> Result<()> {
1189        self.control.close_child_session(session_id).await
1190    }
1191
1192    pub async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
1193        self.control.activate_managed_session(session_id).await
1194    }
1195}
1196
1197#[derive(Clone)]
1198pub struct InjectionControl {
1199    control: SessionControl,
1200}
1201
1202impl InjectionControl {
1203    pub async fn inject_turn_input(
1204        &self,
1205        id: Option<String>,
1206        message: PluginMessage,
1207    ) -> Result<()> {
1208        self.control.inject_turn_input(id, message).await
1209    }
1210
1211    pub async fn inject_turn_inputs(
1212        &self,
1213        messages: Vec<lash_core::InjectedTurnInput>,
1214    ) -> Result<()> {
1215        self.control.inject_turn_inputs(messages).await
1216    }
1217}
1218
1219#[derive(Clone)]
1220pub struct ModeControl {
1221    control: SessionControl,
1222}
1223
1224impl ModeControl {
1225    pub async fn apply_session_extension(
1226        &self,
1227        extension: lash_core::ProtocolSessionExtensionHandle,
1228    ) -> Result<()> {
1229        self.control
1230            .apply_protocol_session_extension(extension)
1231            .await
1232    }
1233}