1pub use crate::session::SessionConfigPatch;
2use crate::support::*;
3pub use lash_core::{AcceptedInjectedTurnInput, PluginAction};
4
5#[derive(Clone)]
6pub struct HostEventsControl {
7 pub(crate) core: LashCore,
8}
9
10impl HostEventsControl {
11 pub async fn emit(
12 &self,
13 request: lash_core::HostEventOccurrenceRequest,
14 scoped_effect_controller: ScopedEffectController<'_>,
15 ) -> Result<lash_core::HostEventEmitReport> {
16 let store = self.core.env.host_event_store.as_ref().ok_or_else(|| {
17 EmbedError::Plugin(lash_core::PluginError::Session(
18 "host event store is unavailable in this runtime".to_string(),
19 ))
20 })?;
21 let process_work_poke = self.core.process_work_runner.poke().await;
22 let router = lash_core::HostEventRouter::new(
23 Arc::clone(store),
24 Arc::clone(&self.core.env.core.durability.lashlang_artifact_store),
25 self.core.env.process_registry.clone(),
26 process_work_poke,
27 self.core.env.core.profile.host_profile_id.clone(),
28 );
29 router
30 .emit(request, scoped_effect_controller.controller())
31 .await
32 .map_err(Into::into)
33 }
34
35 pub async fn subscriptions(
36 &self,
37 filter: lash_core::TriggerSubscriptionFilter,
38 ) -> Result<Vec<lash_core::TriggerRegistration>> {
39 let store = self.core.env.host_event_store.as_ref().ok_or_else(|| {
40 EmbedError::Plugin(lash_core::PluginError::Session(
41 "host event store is unavailable in this runtime".to_string(),
42 ))
43 })?;
44 let records = store.list_subscriptions(filter).await?;
45 Ok(records
46 .iter()
47 .map(lash_core::TriggerRegistration::from)
48 .collect())
49 }
50}
51
52#[derive(Clone)]
53pub struct Processes {
54 pub(crate) core: LashCore,
55}
56
57impl Processes {
58 fn registry(&self) -> Result<Arc<dyn lash_core::ProcessRegistry>> {
59 self.core
60 .env
61 .process_registry
62 .as_ref()
63 .cloned()
64 .ok_or_else(|| {
65 EmbedError::Plugin(lash_core::PluginError::Session(
66 "process registry is unavailable in this runtime".to_string(),
67 ))
68 })
69 }
70
71 fn make_observer(&self) -> Result<lash_core::ProcessWorkObserver> {
72 Ok(lash_core::ProcessWorkObserver::new(self.registry()?))
73 }
74
75 fn process_invocation(command: &lash_core::ProcessCommand) -> lash_core::RuntimeInvocation {
76 let effect_id = command.effect_id();
77 lash_core::RuntimeInvocation::effect(
78 lash_core::runtime::RuntimeScope::new("runtime"),
79 effect_id.clone(),
80 lash_core::RuntimeEffectKind::Process,
81 effect_id,
82 )
83 }
84
85 async fn run_command(
86 &self,
87 command: lash_core::ProcessCommand,
88 scoped_effect_controller: ScopedEffectController<'_>,
89 ) -> Result<lash_core::ProcessEffectOutcome> {
90 let registry = self.registry()?;
91 let invocation = Self::process_invocation(&command);
92 let outcome = scoped_effect_controller
93 .controller()
94 .execute_effect(
95 lash_core::RuntimeEffectEnvelope::new(
96 invocation,
97 lash_core::RuntimeEffectCommand::process(command),
98 ),
99 lash_core::RuntimeEffectLocalExecutor::process_control(registry),
100 )
101 .await
102 .map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))?;
103 match outcome {
104 lash_core::RuntimeEffectOutcome::Process { result } => Ok(result),
105 _ => Err(EmbedError::Plugin(lash_core::PluginError::Session(
106 "process effect returned non-process outcome".to_string(),
107 ))),
108 }
109 }
110
111 pub async fn start(
112 &self,
113 request: lash_core::ProcessStartRequest,
114 scoped_effect_controller: ScopedEffectController<'_>,
115 ) -> Result<lash_core::ProcessRecord> {
116 let env_ref = match request.env_spec.as_ref() {
117 Some(env_spec) => Some(
118 lash_core::runtime::persist_process_execution_env(
119 self.core
120 .env
121 .core
122 .durability
123 .lashlang_artifact_store
124 .as_ref(),
125 env_spec,
126 )
127 .await?,
128 ),
129 None => None,
130 };
131 let grant = request.grant.clone();
132 let registration =
133 request.into_registration(self.core.env.core.profile.host_profile_id.clone(), env_ref);
134 let command = lash_core::ProcessCommand::Start {
135 registration,
136 grant,
137 execution_context: Box::new(lash_core::ProcessExecutionContext::default()),
138 };
139 let outcome = self.run_command(command, scoped_effect_controller).await?;
140 let lash_core::ProcessEffectOutcome::Start { record } = outcome else {
141 return Err(EmbedError::Plugin(lash_core::PluginError::Session(
142 "process start returned the wrong outcome".to_string(),
143 )));
144 };
145 if let Some(poke) = self.core.process_work_runner.poke().await {
146 poke.poke();
147 }
148 Ok(record)
149 }
150
151 pub async fn list(
152 &self,
153 filter: &lash_core::ProcessListFilter,
154 ) -> Result<Vec<lash_core::ObservedProcess>> {
155 self.make_observer()?.list(filter).await.map_err(Into::into)
156 }
157
158 pub async fn get(&self, process_id: &str) -> Result<Option<lash_core::ObservedProcess>> {
159 Ok(self.make_observer()?.process(process_id).await)
160 }
161
162 pub async fn events(
163 &self,
164 process_id: &str,
165 after_sequence: u64,
166 ) -> Result<Vec<lash_core::ObservedProcessEvent>> {
167 self.make_observer()?
168 .events_after(process_id, after_sequence)
169 .await
170 .map_err(Into::into)
171 }
172
173 pub async fn await_output(&self, process_id: &str) -> Result<lash_core::ProcessAwaitOutput> {
174 self.registry()?
175 .await_process(process_id)
176 .await
177 .map_err(Into::into)
178 }
179
180 pub async fn cancel(
181 &self,
182 process_id: &str,
183 scoped_effect_controller: ScopedEffectController<'_>,
184 ) -> Result<lash_core::ProcessCancelSummary> {
185 let command = lash_core::ProcessCommand::Cancel {
186 process_id: process_id.to_string(),
187 reason: Some("requested by host".to_string()),
188 };
189 let outcome = self.run_command(command, scoped_effect_controller).await?;
190 let lash_core::ProcessEffectOutcome::Cancel { record } = outcome else {
191 return Err(EmbedError::Plugin(lash_core::PluginError::Session(
192 "process cancel returned the wrong outcome".to_string(),
193 )));
194 };
195 Ok(lash_core::ProcessCancelSummary::from_record(record))
196 }
197
198 pub async fn signal(
199 &self,
200 process_id: &str,
201 signal_name: impl Into<String>,
202 signal_id: impl Into<String>,
203 request: lash_core::ProcessEventAppendRequest,
204 scoped_effect_controller: ScopedEffectController<'_>,
205 ) -> Result<lash_core::ProcessEvent> {
206 let command = lash_core::ProcessCommand::Signal {
207 process_id: process_id.to_string(),
208 signal_name: signal_name.into(),
209 signal_id: signal_id.into(),
210 request,
211 };
212 let outcome = self.run_command(command, scoped_effect_controller).await?;
213 let lash_core::ProcessEffectOutcome::Signal { event } = outcome else {
214 return Err(EmbedError::Plugin(lash_core::PluginError::Session(
215 "process signal returned the wrong outcome".to_string(),
216 )));
217 };
218 Ok(event)
219 }
220
221 pub async fn session_snapshot(
222 &self,
223 session_id: impl Into<String>,
224 ) -> Result<lash_core::ProcessWorkSnapshot> {
225 self.make_observer()?
226 .snapshot_for_session(session_id)
227 .await
228 .map_err(Into::into)
229 }
230
231 pub fn observer(&self) -> Result<lash_core::ProcessWorkObserver> {
232 self.make_observer()
233 }
234}
235
236#[derive(Clone)]
237pub struct SessionControl {
238 pub(crate) runtime: RuntimeHandle,
239}
240
241impl SessionControl {
242 pub fn config(&self) -> ConfigControl {
243 ConfigControl {
244 control: self.clone(),
245 }
246 }
247
248 pub fn tools(&self) -> ToolsControl {
249 ToolsControl {
250 control: self.clone(),
251 }
252 }
253
254 pub fn commands(&self) -> SessionCommandsControl {
255 SessionCommandsControl {
256 control: self.clone(),
257 }
258 }
259
260 pub fn triggers(&self) -> TriggersControl {
261 TriggersControl {
262 control: self.clone(),
263 }
264 }
265
266 pub fn state(&self) -> StateControl {
267 StateControl {
268 control: self.clone(),
269 }
270 }
271
272 pub fn children(&self) -> ChildrenControl {
273 ChildrenControl {
274 control: self.clone(),
275 }
276 }
277
278 pub fn injection(&self) -> InjectionControl {
279 InjectionControl {
280 control: self.clone(),
281 }
282 }
283
284 pub fn mode(&self) -> ModeControl {
285 ModeControl {
286 control: self.clone(),
287 }
288 }
289
290 pub fn processes(&self) -> ProcessControl {
291 ProcessControl {
292 control: self.clone(),
293 }
294 }
295
296 async fn with_writer<F, T>(&self, f: F) -> T
301 where
302 F: AsyncFnOnce(&mut LashRuntime) -> T,
303 {
304 let writer = self.runtime.writer();
305 let mut runtime = writer.lock().await;
306 let value = f(&mut runtime).await;
307 self.runtime.publish_from(&runtime);
308 value
309 }
310
311 async fn update_config(&self, patch: SessionConfigPatch) -> Result<()> {
312 self.update_session_config(patch.provider, patch.model, patch.prompt)
313 .await?;
314 Ok(())
315 }
316
317 async fn update_session_config(
318 &self,
319 provider: Option<ProviderHandle>,
320 model: Option<lash_core::ModelSpec>,
321 prompt: Option<PromptLayer>,
322 ) -> Result<()> {
323 self.with_writer(async |runtime: &mut LashRuntime| {
324 runtime.update_session_config(provider, model, prompt).await;
325 })
326 .await;
327 Ok(())
328 }
329
330 async fn export_state(&self) -> lash_core::SessionSnapshot {
331 self.runtime.observe().read_view.to_snapshot()
332 }
333
334 async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
335 self.with_writer(async |runtime: &mut LashRuntime| {
336 runtime
337 .append_session_nodes(lash_core::AppendSessionNodesRequest {
338 nodes: messages
339 .into_iter()
340 .map(lash_core::SessionAppendNode::message)
341 .collect(),
342 requires_ancestor_node_id: None,
343 })
344 .await
345 .map(|_| ())
346 .map_err(Into::into)
347 })
348 .await
349 }
350
351 async fn append_plugin_body(
352 &self,
353 plugin_type: impl Into<String>,
354 body: serde_json::Value,
355 ) -> Result<()> {
356 self.with_writer(async |runtime: &mut LashRuntime| {
357 runtime
358 .append_session_nodes(lash_core::AppendSessionNodesRequest {
359 nodes: vec![lash_core::SessionAppendNode::plugin(plugin_type, body)],
360 requires_ancestor_node_id: None,
361 })
362 .await
363 .map(|_| ())
364 .map_err(Into::into)
365 })
366 .await
367 }
368
369 async fn set_persisted_state(&self, state: RuntimeSessionState) -> Result<()> {
370 self.with_writer(async |runtime: &mut LashRuntime| {
371 runtime.set_persisted_state(state).map_err(Into::into)
372 })
373 .await
374 }
375
376 async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
377 self.with_writer(async |runtime: &mut LashRuntime| {
378 runtime.set_prompt_template(template).await;
379 })
380 .await;
381 Ok(())
382 }
383
384 async fn clear_prompt_template(&self) -> Result<()> {
385 self.with_writer(async |runtime: &mut LashRuntime| {
386 runtime.clear_prompt_template().await;
387 })
388 .await;
389 Ok(())
390 }
391
392 async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
393 self.with_writer(async |runtime: &mut LashRuntime| {
394 runtime.add_prompt_contribution(contribution).await;
395 })
396 .await;
397 Ok(())
398 }
399
400 async fn replace_prompt_slot(
401 &self,
402 slot: PromptSlot,
403 contributions: impl IntoIterator<Item = PromptContribution>,
404 ) -> Result<()> {
405 self.with_writer(async |runtime: &mut LashRuntime| {
406 runtime.replace_prompt_slot(slot, contributions).await;
407 })
408 .await;
409 Ok(())
410 }
411
412 async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
413 self.with_writer(async |runtime: &mut LashRuntime| {
414 runtime.clear_prompt_slot(slot).await;
415 })
416 .await;
417 Ok(())
418 }
419
420 async fn apply_protocol_session_extension(
421 &self,
422 extension: lash_core::ProtocolSessionExtensionHandle,
423 ) -> Result<()> {
424 self.with_writer(async |runtime: &mut LashRuntime| {
425 runtime
426 .apply_protocol_session_extension(extension)
427 .await
428 .map_err(Into::into)
429 })
430 .await
431 }
432
433 async fn branch_to_node(
434 &self,
435 target_leaf: Option<String>,
436 ) -> Result<lash_core::SessionSnapshot> {
437 self.with_writer(async |runtime: &mut LashRuntime| {
438 runtime
439 .branch_to_node(target_leaf)
440 .await
441 .map_err(Into::into)
442 })
443 .await
444 }
445
446 async fn await_background_work(&self) -> Result<()> {
447 self.with_writer(async |runtime: &mut LashRuntime| {
448 runtime.await_background_work().await.map_err(Into::into)
449 })
450 .await
451 }
452
453 async fn refresh_tool_surface(&self) -> Result<()> {
454 self.with_writer(async |runtime: &mut LashRuntime| {
455 runtime
456 .refresh_session_tool_surface()
457 .await
458 .map_err(Into::into)
459 })
460 .await
461 }
462
463 async fn submit_session_command(
464 &self,
465 command: lash_core::SessionCommand,
466 idempotency_key: impl Into<String>,
467 ) -> Result<lash_core::SessionCommandReceipt> {
468 let idempotency_key = idempotency_key.into();
469 self.with_writer(async |runtime: &mut LashRuntime| {
470 runtime
471 .submit_session_command(command, idempotency_key)
472 .await
473 .map_err(Into::into)
474 })
475 .await
476 }
477
478 async fn list_lashlang_trigger_registrations(
479 &self,
480 ) -> Result<Vec<lash_core::TriggerRegistration>> {
481 self.with_writer(async |runtime: &mut LashRuntime| {
482 runtime
483 .list_lashlang_trigger_registrations()
484 .await
485 .map_err(Into::into)
486 })
487 .await
488 }
489
490 async fn lashlang_trigger_registrations_by_source_type(
491 &self,
492 source_type: impl Into<lash_core::TriggerSourceType>,
493 ) -> Result<Vec<lash_core::TriggerRegistration>> {
494 self.with_writer(async |runtime: &mut LashRuntime| {
495 runtime
496 .lashlang_trigger_registrations_by_source_type(source_type)
497 .await
498 .map_err(Into::into)
499 })
500 .await
501 }
502
503 async fn invoke_plugin_action(
504 &self,
505 name: &str,
506 args: serde_json::Value,
507 ) -> Result<ToolResult> {
508 let session_id = self.runtime.observe().session_id().to_string();
509 let writer = self.runtime.writer();
510 writer
511 .lock()
512 .await
513 .invoke_plugin_action(name, args, Some(session_id))
514 .await
515 .map_err(Into::into)
516 }
517
518 async fn call_plugin_action<Op: lash_core::PluginAction>(
519 &self,
520 args: Op::Args,
521 ) -> Result<Op::Output> {
522 let result = self
523 .invoke_plugin_action(
524 Op::NAME,
525 serde_json::to_value(args).map_err(|err| {
526 EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
527 "invalid {} args: {err}",
528 Op::NAME
529 )))
530 })?,
531 )
532 .await?;
533 if !result.is_success() {
534 return Err(EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
535 "{} failed: {}",
536 Op::NAME,
537 result.value_for_projection()
538 ))));
539 }
540 serde_json::from_value(result.into_output().value_for_projection()).map_err(|err| {
541 EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
542 "invalid {} output: {err}",
543 Op::NAME
544 )))
545 })
546 }
547
548 async fn compact_context(
549 &self,
550 instructions: Option<String>,
551 scoped_effect_controller: ScopedEffectController<'_>,
552 ) -> Result<bool> {
553 self.with_writer(async |runtime: &mut LashRuntime| {
554 runtime
555 .compact_context(instructions, scoped_effect_controller)
556 .await
557 .map_err(Into::into)
558 })
559 .await
560 }
561
562 async fn persist_current_state(&self) -> Result<RuntimeSessionState> {
563 self.with_writer(async |runtime: &mut LashRuntime| {
564 runtime.await_background_work().await?;
565 Ok(runtime.export_persisted_state())
566 })
567 .await
568 }
569
570 async fn list_process_handles(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
571 Ok(self.runtime.observe().list_process_handles().await)
572 }
573
574 async fn list_all_process_handles(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
575 Ok(self.runtime.observe().list_all_process_handles().await)
576 }
577
578 async fn start_process(
579 &self,
580 request: lash_core::ProcessStartRequest,
581 scoped_effect_controller: ScopedEffectController<'_>,
582 ) -> Result<lash_core::ProcessHandleSummary> {
583 let writer = self.runtime.writer();
584 let runtime = writer.lock().await;
585 let session_id = runtime.session_id().to_string();
586 let processes = runtime.process_service()?;
587 let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
588 let summary = processes
589 .start_from_request(&session_id, request, scope)
590 .await
591 .map_err(EmbedError::Plugin)?;
592 self.runtime.record_process_changed(
593 SessionProcessEventKind::Started,
594 vec![summary.process_id.clone()],
595 );
596 Ok(summary)
597 }
598
599 async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
600 self.runtime
601 .writer()
602 .lock()
603 .await
604 .session_state_service()
605 .map_err(Into::into)
606 }
607
608 async fn cancel_process(
609 &self,
610 process_id: &str,
611 scoped_effect_controller: ScopedEffectController<'_>,
612 ) -> Result<lash_core::ProcessCancelSummary> {
613 let writer = self.runtime.writer();
614 let runtime = writer.lock().await;
615 let session_id = runtime.session_id().to_string();
616 let processes = runtime.process_service()?;
617 let cancel_ability = runtime.process_cancel_ability();
618 let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
619 let summary = cancel_ability
620 .cancel_summary(
621 processes.as_ref(),
622 lash_core::ProcessCancelRequest::new(
623 &session_id,
624 process_id,
625 scope,
626 lash_core::ProcessCancelSource::HostApi,
627 )
628 .with_reason("requested by host API"),
629 )
630 .await
631 .map_err(EmbedError::Plugin)?;
632 self.runtime.record_process_changed(
633 SessionProcessEventKind::Cancelled,
634 vec![summary.process_id.clone()],
635 );
636 Ok(summary)
637 }
638
639 async fn cancel_visible_processes(
640 &self,
641 scoped_effect_controller: ScopedEffectController<'_>,
642 ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
643 let writer = self.runtime.writer();
644 let runtime = writer.lock().await;
645 let session_id = runtime.session_id().to_string();
646 let processes = runtime.process_service()?;
647 let cancel_ability = runtime.process_cancel_ability();
648 let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
649 let summaries = cancel_ability
650 .cancel_all_visible(
651 processes.as_ref(),
652 lash_core::ProcessCancelAllRequest::new(
653 &session_id,
654 scope,
655 lash_core::ProcessCancelSource::HostApi,
656 )
657 .with_reason("requested by host API"),
658 )
659 .await
660 .map_err(EmbedError::Plugin)?;
661 self.runtime.record_process_changed(
662 SessionProcessEventKind::Cancelled,
663 summaries
664 .iter()
665 .map(|summary| summary.process_id.clone())
666 .collect(),
667 );
668 Ok(summaries)
669 }
670
671 async fn snapshot_execution_state(&self) -> Result<Option<Vec<u8>>> {
672 self.with_writer(async |runtime: &mut LashRuntime| {
673 runtime.snapshot_execution_state().await.map_err(Into::into)
674 })
675 .await
676 }
677
678 async fn restore_execution_state(&self, bytes: &[u8]) -> Result<()> {
679 self.with_writer(async |runtime: &mut LashRuntime| {
680 runtime
681 .restore_execution_state(bytes)
682 .await
683 .map_err(Into::into)
684 })
685 .await
686 }
687
688 async fn tool_state(&self) -> Result<ToolState> {
689 self.runtime.observe().tool_state.clone().ok_or_else(|| {
690 EmbedError::Session(SessionError::Protocol(
691 "runtime session not available".to_string(),
692 ))
693 })
694 }
695
696 async fn apply_tool_state(&self, state: ToolState) -> Result<u64> {
697 self.with_writer(async |runtime: &mut LashRuntime| {
698 runtime
699 .apply_tool_state(state)
700 .await
701 .map_err(EmbedError::from)
702 })
703 .await
704 }
705
706 async fn restore_tool_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
707 self.with_writer(async |runtime: &mut LashRuntime| {
708 runtime
709 .restore_tool_state(state)
710 .await
711 .map_err(EmbedError::from)
712 })
713 .await
714 }
715
716 async fn set_tool_availability(
717 &self,
718 name: &str,
719 availability: ToolAvailability,
720 ) -> Result<u64> {
721 self.set_tool_availability_many(&[(name, availability)])
722 .await
723 }
724
725 async fn set_tool_availability_many<N: AsRef<str>>(
726 &self,
727 updates: &[(N, ToolAvailability)],
728 ) -> Result<u64> {
729 let mut state = self.tool_state().await?;
730 for (name, availability) in updates {
731 state
732 .set_availability(name.as_ref(), Some(*availability))
733 .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
734 }
735 self.apply_tool_state(state).await
736 }
737
738 async fn clear_tool_availability_override(&self, name: &str) -> Result<u64> {
739 let mut state = self.tool_state().await?;
740 state
741 .set_availability(name, None)
742 .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
743 self.apply_tool_state(state).await
744 }
745
746 async fn active_tool_manifests(&self) -> Result<Vec<ToolManifest>> {
747 Ok(self.tool_state().await?.tool_manifests())
748 }
749
750 async fn add_tool_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
751 let tool_registry = self.tool_registry().await?;
752 let handle = tool_registry
753 .add_tool_provider(provider)
754 .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
755 self.refresh_tool_surface().await?;
756 Ok(handle)
757 }
758
759 async fn remove_tool_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
760 let tool_registry = self.tool_registry().await?;
761 let generation = tool_registry
762 .remove_source(handle)
763 .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
764 self.refresh_tool_surface().await?;
765 Ok(generation)
766 }
767
768 async fn create_child_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
769 let writer = self.runtime.writer();
770 let runtime = writer.lock().await;
771 let lifecycle = runtime.session_lifecycle_service()?;
772 lifecycle.create_session(request).await.map_err(Into::into)
773 }
774
775 async fn close_child_session(&self, session_id: &str) -> Result<()> {
776 let writer = self.runtime.writer();
777 let runtime = writer.lock().await;
778 let lifecycle = runtime.session_lifecycle_service()?;
779 lifecycle
780 .close_session(session_id)
781 .await
782 .map_err(Into::into)
783 }
784
785 async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
786 self.with_writer(async |runtime: &mut LashRuntime| {
787 runtime
788 .activate_managed_session(session_id)
789 .await
790 .map_err(Into::into)
791 })
792 .await
793 }
794
795 async fn inject_turn_input(&self, id: Option<String>, message: PluginMessage) -> Result<()> {
796 self.inject_turn_inputs(vec![lash_core::InjectedTurnInput { id, message }])
797 .await
798 }
799
800 async fn inject_turn_inputs(&self, messages: Vec<lash_core::InjectedTurnInput>) -> Result<()> {
801 for input in messages {
802 let source_key = input.id.map(|id| format!("injection:{id}"));
803 let turn_input = turn_input_from_plugin_message(input.message);
804 self.runtime
805 .enqueue_turn_input(
806 turn_input,
807 lash_core::DeliveryPolicy::EarliestSafeBoundary,
808 lash_core::SlotPolicy::Join,
809 source_key,
810 )
811 .await
812 .map(|_| ())
813 .map_err(EmbedError::Runtime)?;
814 }
815 Ok(())
816 }
817
818 async fn tool_registry(&self) -> Result<Arc<lash_core::ToolRegistry>> {
819 self.runtime
820 .writer()
821 .lock()
822 .await
823 .plugin_session()
824 .map(|session| session.tool_registry())
825 .ok_or_else(|| {
826 EmbedError::Session(SessionError::Protocol(
827 "tool registry is unavailable in this runtime session".to_string(),
828 ))
829 })
830 }
831}
832
833fn turn_input_from_plugin_message(message: PluginMessage) -> TurnInput {
834 let mut input = TurnInput::empty();
835 if !message.content.is_empty() {
836 input.items.push(InputItem::Text {
837 text: message.content,
838 });
839 }
840 for (index, bytes) in message.images.into_iter().enumerate() {
841 let id = format!("injected-image-{index}");
842 input.items.push(InputItem::ImageRef { id: id.clone() });
843 input.image_blobs.insert(id, bytes);
844 }
845 input
846}
847
848#[derive(Clone)]
849pub struct ConfigControl {
850 control: SessionControl,
851}
852
853impl ConfigControl {
854 pub async fn update(&self, patch: SessionConfigPatch) -> Result<()> {
855 self.control.update_config(patch).await
856 }
857
858 pub async fn update_session_config(
859 &self,
860 provider: Option<ProviderHandle>,
861 model: Option<lash_core::ModelSpec>,
862 prompt: Option<PromptLayer>,
863 ) -> Result<()> {
864 self.control
865 .update_session_config(provider, model, prompt)
866 .await
867 }
868
869 pub async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
870 self.control.set_prompt_template(template).await
871 }
872
873 pub async fn clear_prompt_template(&self) -> Result<()> {
874 self.control.clear_prompt_template().await
875 }
876
877 pub async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
878 self.control.add_prompt_contribution(contribution).await
879 }
880
881 pub async fn replace_prompt_slot(
882 &self,
883 slot: PromptSlot,
884 contributions: impl IntoIterator<Item = PromptContribution>,
885 ) -> Result<()> {
886 self.control.replace_prompt_slot(slot, contributions).await
887 }
888
889 pub async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
890 self.control.clear_prompt_slot(slot).await
891 }
892}
893
894#[derive(Clone)]
895pub struct ToolsControl {
896 control: SessionControl,
897}
898
899impl ToolsControl {
900 pub(crate) fn new(control: SessionControl) -> Self {
901 Self { control }
902 }
903}
904
905impl ToolsControl {
906 pub async fn state(&self) -> Result<ToolState> {
907 self.control.tool_state().await
908 }
909
910 pub fn advanced(&self) -> AdvancedToolsControl {
911 AdvancedToolsControl {
912 control: self.control.clone(),
913 }
914 }
915
916 pub async fn set_availability(
917 &self,
918 name: impl AsRef<str>,
919 availability: ToolAvailability,
920 ) -> Result<u64> {
921 self.control
922 .set_tool_availability(name.as_ref(), availability)
923 .await
924 }
925
926 pub async fn set_availability_many<N: AsRef<str>>(
927 &self,
928 updates: &[(N, ToolAvailability)],
929 ) -> Result<u64> {
930 self.control.set_tool_availability_many(updates).await
931 }
932
933 pub async fn clear_availability_override(&self, name: impl AsRef<str>) -> Result<u64> {
934 self.control
935 .clear_tool_availability_override(name.as_ref())
936 .await
937 }
938
939 pub async fn active_manifests(&self) -> Result<Vec<ToolManifest>> {
940 self.control.active_tool_manifests().await
941 }
942
943 pub async fn add_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
944 self.control.add_tool_provider(provider).await
945 }
946
947 pub async fn remove_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
948 self.control.remove_tool_source(handle).await
949 }
950}
951
952#[derive(Clone)]
953pub struct AdvancedToolsControl {
954 control: SessionControl,
955}
956
957impl AdvancedToolsControl {
958 pub async fn apply_state(&self, state: ToolState) -> Result<u64> {
964 self.control.apply_tool_state(state).await
965 }
966
967 pub async fn restore_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
980 self.control.restore_tool_state(state).await
981 }
982}
983
984#[derive(Clone)]
985pub struct SessionCommandsControl {
986 control: SessionControl,
987}
988
989impl SessionCommandsControl {
990 pub async fn refresh_tool_surface(
995 &self,
996 reason: impl Into<String>,
997 idempotency_key: impl Into<String>,
998 ) -> Result<lash_core::SessionCommandReceipt> {
999 self.control
1000 .submit_session_command(
1001 lash_core::SessionCommand::RefreshToolSurface {
1002 reason: reason.into(),
1003 },
1004 idempotency_key,
1005 )
1006 .await
1007 }
1008
1009 pub async fn reset(
1010 &self,
1011 reason: impl Into<String>,
1012 idempotency_key: impl Into<String>,
1013 ) -> Result<lash_core::SessionCommandReceipt> {
1014 self.control
1015 .submit_session_command(
1016 lash_core::SessionCommand::ResetSession {
1017 reason: reason.into(),
1018 },
1019 idempotency_key,
1020 )
1021 .await
1022 }
1023}
1024
1025#[derive(Clone)]
1027pub struct TriggersControl {
1028 control: SessionControl,
1029}
1030
1031impl TriggersControl {
1032 pub async fn list_all(&self) -> Result<Vec<lash_core::TriggerRegistration>> {
1038 self.control.list_lashlang_trigger_registrations().await
1039 }
1040
1041 pub async fn by_source_type(
1046 &self,
1047 source_type: impl Into<lash_core::TriggerSourceType>,
1048 ) -> Result<Vec<lash_core::TriggerRegistration>> {
1049 self.control
1050 .lashlang_trigger_registrations_by_source_type(source_type)
1051 .await
1052 }
1053}
1054
1055#[derive(Clone)]
1056pub struct ProcessControl {
1057 control: SessionControl,
1058}
1059
1060impl ProcessControl {
1061 pub(crate) fn new(control: SessionControl) -> Self {
1062 Self { control }
1063 }
1064
1065 pub async fn start(
1066 &self,
1067 request: lash_core::ProcessStartRequest,
1068 scoped_effect_controller: ScopedEffectController<'_>,
1069 ) -> Result<lash_core::ProcessHandleSummary> {
1070 self.control
1071 .start_process(request, scoped_effect_controller)
1072 .await
1073 }
1074
1075 pub async fn list(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
1076 self.control.list_process_handles().await
1077 }
1078
1079 pub async fn list_all(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
1080 self.control.list_all_process_handles().await
1081 }
1082
1083 pub async fn await_all(&self) -> Result<()> {
1084 self.control.await_background_work().await
1085 }
1086
1087 pub async fn cancel(
1088 &self,
1089 process_id: &str,
1090 scoped_effect_controller: ScopedEffectController<'_>,
1091 ) -> Result<lash_core::ProcessCancelSummary> {
1092 self.control
1093 .cancel_process(process_id, scoped_effect_controller)
1094 .await
1095 }
1096
1097 pub async fn cancel_all(
1098 &self,
1099 scoped_effect_controller: ScopedEffectController<'_>,
1100 ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
1101 self.control
1102 .cancel_visible_processes(scoped_effect_controller)
1103 .await
1104 }
1105}
1106
1107#[derive(Clone)]
1108pub struct StateControl {
1109 control: SessionControl,
1110}
1111
1112impl StateControl {
1113 pub async fn export(&self) -> lash_core::SessionSnapshot {
1114 self.control.export_state().await
1115 }
1116
1117 pub async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
1118 self.control.append_messages(messages).await
1119 }
1120
1121 pub async fn append_plugin_body(
1122 &self,
1123 plugin_type: impl Into<String>,
1124 body: serde_json::Value,
1125 ) -> Result<()> {
1126 self.control.append_plugin_body(plugin_type, body).await
1127 }
1128
1129 pub async fn set_persisted(&self, state: RuntimeSessionState) -> Result<()> {
1130 self.control.set_persisted_state(state).await
1131 }
1132
1133 pub async fn branch_to_node(
1134 &self,
1135 target_leaf: Option<String>,
1136 ) -> Result<lash_core::SessionSnapshot> {
1137 self.control.branch_to_node(target_leaf).await
1138 }
1139
1140 pub async fn persist_current(&self) -> Result<RuntimeSessionState> {
1141 self.control.persist_current_state().await
1142 }
1143
1144 pub async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
1145 self.control.session_state_service().await
1146 }
1147
1148 pub async fn snapshot_execution(&self) -> Result<Option<Vec<u8>>> {
1149 self.control.snapshot_execution_state().await
1150 }
1151
1152 pub async fn restore_execution(&self, bytes: &[u8]) -> Result<()> {
1153 self.control.restore_execution_state(bytes).await
1154 }
1155
1156 pub async fn compact_context(
1157 &self,
1158 instructions: Option<String>,
1159 scoped_effect_controller: ScopedEffectController<'_>,
1160 ) -> Result<bool> {
1161 self.control
1162 .compact_context(instructions, scoped_effect_controller)
1163 .await
1164 }
1165}
1166
1167#[derive(Clone)]
1168pub struct PluginActions {
1169 pub(crate) control: SessionControl,
1170}
1171
1172impl PluginActions {
1173 pub async fn call<Op: lash_core::PluginAction>(&self, args: Op::Args) -> Result<Op::Output> {
1174 self.control.call_plugin_action::<Op>(args).await
1175 }
1176}
1177
1178#[derive(Clone)]
1179pub struct ChildrenControl {
1180 control: SessionControl,
1181}
1182
1183impl ChildrenControl {
1184 pub async fn create_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
1185 self.control.create_child_session(request).await
1186 }
1187
1188 pub async fn close_session(&self, session_id: &str) -> Result<()> {
1189 self.control.close_child_session(session_id).await
1190 }
1191
1192 pub async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
1193 self.control.activate_managed_session(session_id).await
1194 }
1195}
1196
1197#[derive(Clone)]
1198pub struct InjectionControl {
1199 control: SessionControl,
1200}
1201
1202impl InjectionControl {
1203 pub async fn inject_turn_input(
1204 &self,
1205 id: Option<String>,
1206 message: PluginMessage,
1207 ) -> Result<()> {
1208 self.control.inject_turn_input(id, message).await
1209 }
1210
1211 pub async fn inject_turn_inputs(
1212 &self,
1213 messages: Vec<lash_core::InjectedTurnInput>,
1214 ) -> Result<()> {
1215 self.control.inject_turn_inputs(messages).await
1216 }
1217}
1218
1219#[derive(Clone)]
1220pub struct ModeControl {
1221 control: SessionControl,
1222}
1223
1224impl ModeControl {
1225 pub async fn apply_session_extension(
1226 &self,
1227 extension: lash_core::ProtocolSessionExtensionHandle,
1228 ) -> Result<()> {
1229 self.control
1230 .apply_protocol_session_extension(extension)
1231 .await
1232 }
1233}