1pub use crate::session::SessionConfigPatch;
2use crate::support::*;
3pub use lash_core::{AcceptedInjectedTurnInput, PluginAction};
4
5#[derive(Clone)]
6pub struct Completions {
7 pub(crate) core: LashCore,
8}
9
10impl Completions {
11 pub async fn resolve(
12 &self,
13 key: lash_core::AwaitEventKey,
14 resolution: lash_core::Resolution,
15 ) -> Result<lash_core::ResolveOutcome> {
16 self.core
17 .env
18 .core
19 .control
20 .effect_host
21 .resolve_await_event(&key, resolution)
22 .await
23 .map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))
24 }
25}
26
27#[derive(Clone)]
28pub struct CoreTriggerAdmin {
29 pub(crate) core: LashCore,
30}
31
32impl CoreTriggerAdmin {
33 pub async fn emit(
34 &self,
35 request: lash_core::TriggerOccurrenceRequest,
36 scoped_effect_controller: ScopedEffectController<'_>,
37 ) -> Result<lash_core::TriggerEmitReport> {
38 let store = self.core.env.trigger_store.as_ref().ok_or_else(|| {
39 EmbedError::Plugin(lash_core::PluginError::Session(
40 "trigger store is unavailable in this runtime".to_string(),
41 ))
42 })?;
43 let process_work_poke = self.core.process_work_runner.poke().await;
44 let router = lash_core::TriggerRouter::new(
45 Arc::clone(store),
46 Arc::clone(&self.core.env.core.durability.lashlang_artifact_store),
47 self.core.env.process_registry.clone(),
48 process_work_poke,
49 );
50 router
51 .emit(request, scoped_effect_controller.controller())
52 .await
53 .map_err(Into::into)
54 }
55
56 pub async fn subscriptions(
57 &self,
58 filter: lash_core::TriggerSubscriptionFilter,
59 ) -> Result<Vec<lash_core::TriggerRegistration>> {
60 let store = self.core.env.trigger_store.as_ref().ok_or_else(|| {
61 EmbedError::Plugin(lash_core::PluginError::Session(
62 "trigger store is unavailable in this runtime".to_string(),
63 ))
64 })?;
65 let records = store.list_subscriptions(filter).await?;
66 Ok(records
67 .iter()
68 .map(lash_core::TriggerRegistration::from)
69 .collect())
70 }
71}
72
73#[derive(Clone)]
74pub struct Processes {
75 pub(crate) core: LashCore,
76}
77
78impl Processes {
79 fn registry(&self) -> Result<Arc<dyn lash_core::ProcessRegistry>> {
80 self.core
81 .env
82 .process_registry
83 .as_ref()
84 .cloned()
85 .ok_or_else(|| {
86 EmbedError::Plugin(lash_core::PluginError::Session(
87 "process registry is unavailable in this runtime".to_string(),
88 ))
89 })
90 }
91
92 fn make_observer(&self) -> Result<lash_core::ProcessWorkObserver> {
93 Ok(lash_core::ProcessWorkObserver::new(self.registry()?))
94 }
95
96 fn process_invocation(command: &lash_core::ProcessCommand) -> lash_core::RuntimeInvocation {
97 let effect_id = command.effect_id();
98 lash_core::RuntimeInvocation::effect(
99 lash_core::runtime::RuntimeScope::new("runtime"),
100 effect_id.clone(),
101 lash_core::RuntimeEffectKind::Process,
102 effect_id,
103 )
104 }
105
106 async fn run_command(
107 &self,
108 command: lash_core::ProcessCommand,
109 scoped_effect_controller: ScopedEffectController<'_>,
110 ) -> Result<lash_core::ProcessEffectOutcome> {
111 let registry = self.registry()?;
112 let invocation = Self::process_invocation(&command);
113 let outcome = scoped_effect_controller
114 .controller()
115 .execute_effect(
116 lash_core::RuntimeEffectEnvelope::new(
117 invocation,
118 lash_core::RuntimeEffectCommand::process(command),
119 ),
120 lash_core::RuntimeEffectLocalExecutor::processes(registry),
121 )
122 .await
123 .map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))?;
124 match outcome {
125 lash_core::RuntimeEffectOutcome::Process { result } => Ok(result),
126 _ => Err(EmbedError::Plugin(lash_core::PluginError::Session(
127 "process effect returned non-process outcome".to_string(),
128 ))),
129 }
130 }
131
132 pub async fn start(
133 &self,
134 request: lash_core::ProcessStartRequest,
135 scoped_effect_controller: ScopedEffectController<'_>,
136 ) -> Result<lash_core::ProcessRecord> {
137 let env_ref = match request.env_spec.as_ref() {
138 Some(env_spec) => Some(
139 lash_core::runtime::persist_process_execution_env(
140 self.core
141 .env
142 .core
143 .durability
144 .lashlang_artifact_store
145 .as_ref(),
146 env_spec,
147 )
148 .await?,
149 ),
150 None => None,
151 };
152 let grant = request.grant.clone();
153 let registration = request.into_registration(env_ref);
154 let command = lash_core::ProcessCommand::Start {
155 registration,
156 grant,
157 execution_context: Box::new(lash_core::ProcessExecutionContext::default()),
158 };
159 let outcome = self
160 .run_command(command, scoped_effect_controller.clone())
161 .await?;
162 let lash_core::ProcessEffectOutcome::Start { record } = outcome else {
163 return Err(EmbedError::Plugin(lash_core::PluginError::Session(
164 "process start returned the wrong outcome".to_string(),
165 )));
166 };
167 if let Some(poke) = self.core.process_work_runner.poke().await {
168 poke.poke();
169 }
170 Ok(record)
171 }
172
173 pub async fn list(
174 &self,
175 filter: &lash_core::ProcessListFilter,
176 ) -> Result<Vec<lash_core::ObservedProcess>> {
177 self.make_observer()?.list(filter).await.map_err(Into::into)
178 }
179
180 pub async fn get(&self, process_id: &str) -> Result<Option<lash_core::ObservedProcess>> {
181 Ok(self.make_observer()?.process(process_id).await)
182 }
183
184 pub async fn events(
185 &self,
186 process_id: &str,
187 after_sequence: u64,
188 ) -> Result<Vec<lash_core::ObservedProcessEvent>> {
189 self.make_observer()?
190 .events_after(process_id, after_sequence)
191 .await
192 .map_err(Into::into)
193 }
194
195 pub async fn await_output(&self, process_id: &str) -> Result<lash_core::ProcessAwaitOutput> {
196 self.registry()?
197 .await_process(process_id)
198 .await
199 .map_err(Into::into)
200 }
201
202 pub async fn cancel(
203 &self,
204 process_id: &str,
205 scoped_effect_controller: ScopedEffectController<'_>,
206 ) -> Result<lash_core::ProcessCancelSummary> {
207 let command = lash_core::ProcessCommand::Cancel {
208 process_id: process_id.to_string(),
209 reason: Some("requested by host".to_string()),
210 };
211 let outcome = self
212 .run_command(command, scoped_effect_controller.clone())
213 .await?;
214 let lash_core::ProcessEffectOutcome::Cancel { record } = outcome else {
215 return Err(EmbedError::Plugin(lash_core::PluginError::Session(
216 "process cancel returned the wrong outcome".to_string(),
217 )));
218 };
219 Ok(lash_core::ProcessCancelSummary::from_record(record))
220 }
221
222 pub async fn signal(
223 &self,
224 process_id: &str,
225 signal_name: impl Into<String>,
226 signal_id: impl Into<String>,
227 request: lash_core::ProcessEventAppendRequest,
228 scoped_effect_controller: ScopedEffectController<'_>,
229 ) -> Result<lash_core::ProcessEvent> {
230 let signal_name = signal_name.into();
231 let event_type = request.event_type.clone();
232 let payload = request.payload.clone();
233 let command = lash_core::ProcessCommand::Signal {
234 process_id: process_id.to_string(),
235 signal_name: signal_name.clone(),
236 signal_id: signal_id.into(),
237 request,
238 };
239 let outcome = self
240 .run_command(command, scoped_effect_controller.clone())
241 .await?;
242 let lash_core::ProcessEffectOutcome::Signal { event } = outcome else {
243 return Err(EmbedError::Plugin(lash_core::PluginError::Session(
244 "process signal returned the wrong outcome".to_string(),
245 )));
246 };
247 let registry = self.registry()?;
248 let waiting_ordinal =
249 registry
250 .get_process(process_id)
251 .await
252 .and_then(|record| match record.wait {
253 Some(lash_core::WaitState {
254 kind:
255 lash_core::WaitKind::Signal {
256 name,
257 event_type: wait_event_type,
258 ordinal,
259 ..
260 },
261 ..
262 }) if name == signal_name && wait_event_type == event_type => Some(ordinal),
263 _ => None,
264 });
265 let ordinal = match waiting_ordinal {
266 Some(ordinal) => ordinal,
267 None => {
268 registry
269 .count_events_through(process_id, &event_type, event.sequence)
270 .await?
271 }
272 };
273 if ordinal > 0 {
274 let key = scoped_effect_controller
275 .controller()
276 .await_event_key(
277 &lash_core::ExecutionScope::process(process_id),
278 lash_core::AwaitEventWaitIdentity::process_signal(
279 process_id,
280 &signal_name,
281 ordinal,
282 ),
283 )
284 .await
285 .map_err(|err| {
286 EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
287 })?;
288 let _ = scoped_effect_controller
289 .controller()
290 .resolve_await_event(&key, lash_core::Resolution::Ok(payload))
291 .await
292 .map_err(|err| {
293 EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
294 })?;
295 }
296 Ok(event)
297 }
298
299 pub async fn session_snapshot(
300 &self,
301 session_id: impl Into<String>,
302 ) -> Result<lash_core::ProcessWorkSnapshot> {
303 self.make_observer()?
304 .snapshot_for_session(session_id)
305 .await
306 .map_err(Into::into)
307 }
308
309 pub fn observer(&self) -> Result<lash_core::ProcessWorkObserver> {
310 self.make_observer()
311 }
312}
313
314#[derive(Clone)]
315pub struct SessionAdmin {
316 pub(crate) runtime: RuntimeHandle,
317}
318
319impl SessionAdmin {
320 pub fn config(&self) -> SessionConfigAdmin {
321 SessionConfigAdmin {
322 control: self.clone(),
323 }
324 }
325
326 pub fn tools(&self) -> ToolAdmin {
327 ToolAdmin {
328 control: self.clone(),
329 }
330 }
331
332 pub fn commands(&self) -> SessionCommandAdmin {
333 SessionCommandAdmin {
334 control: self.clone(),
335 }
336 }
337
338 pub fn triggers(&self) -> SessionTriggerAdmin {
339 SessionTriggerAdmin {
340 control: self.clone(),
341 }
342 }
343
344 pub fn state(&self) -> SessionStateAdmin {
345 SessionStateAdmin {
346 control: self.clone(),
347 }
348 }
349
350 pub fn children(&self) -> ChildSessionAdmin {
351 ChildSessionAdmin {
352 control: self.clone(),
353 }
354 }
355
356 pub fn injection(&self) -> InjectionAdmin {
357 InjectionAdmin {
358 control: self.clone(),
359 }
360 }
361
362 pub fn mode(&self) -> ModeAdmin {
363 ModeAdmin {
364 control: self.clone(),
365 }
366 }
367
368 pub fn processes(&self) -> SessionProcessAdmin {
369 SessionProcessAdmin {
370 control: self.clone(),
371 }
372 }
373
374 async fn with_writer<F, T>(&self, f: F) -> T
379 where
380 F: AsyncFnOnce(&mut LashRuntime) -> T,
381 {
382 let writer = self.runtime.writer();
383 let mut runtime = writer.lock().await;
384 let value = f(&mut runtime).await;
385 self.runtime.publish_from(&runtime);
386 value
387 }
388
389 async fn update_config(&self, patch: SessionConfigPatch) -> Result<()> {
390 self.update_session_config(patch.provider, patch.model, patch.prompt)
391 .await?;
392 Ok(())
393 }
394
395 async fn update_session_config(
396 &self,
397 provider: Option<ProviderHandle>,
398 model: Option<lash_core::ModelSpec>,
399 prompt: Option<PromptLayer>,
400 ) -> Result<()> {
401 self.with_writer(async |runtime: &mut LashRuntime| {
402 runtime.update_session_config(provider, model, prompt).await;
403 })
404 .await;
405 Ok(())
406 }
407
408 async fn export_state(&self) -> lash_core::SessionSnapshot {
409 self.runtime.observe().read_view.to_snapshot()
410 }
411
412 async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
413 self.with_writer(async |runtime: &mut LashRuntime| {
414 runtime
415 .append_session_nodes(lash_core::AppendSessionNodesRequest {
416 nodes: messages
417 .into_iter()
418 .map(lash_core::SessionAppendNode::message)
419 .collect(),
420 requires_ancestor_node_id: None,
421 })
422 .await
423 .map(|_| ())
424 .map_err(Into::into)
425 })
426 .await
427 }
428
429 async fn append_plugin_body(
430 &self,
431 plugin_type: impl Into<String>,
432 body: serde_json::Value,
433 ) -> Result<()> {
434 self.with_writer(async |runtime: &mut LashRuntime| {
435 runtime
436 .append_session_nodes(lash_core::AppendSessionNodesRequest {
437 nodes: vec![lash_core::SessionAppendNode::plugin(plugin_type, body)],
438 requires_ancestor_node_id: None,
439 })
440 .await
441 .map(|_| ())
442 .map_err(Into::into)
443 })
444 .await
445 }
446
447 async fn set_persisted_state(&self, state: RuntimeSessionState) -> Result<()> {
448 self.with_writer(async |runtime: &mut LashRuntime| {
449 runtime.set_persisted_state(state).map_err(Into::into)
450 })
451 .await
452 }
453
454 async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
455 self.with_writer(async |runtime: &mut LashRuntime| {
456 runtime.set_prompt_template(template).await;
457 })
458 .await;
459 Ok(())
460 }
461
462 async fn clear_prompt_template(&self) -> Result<()> {
463 self.with_writer(async |runtime: &mut LashRuntime| {
464 runtime.clear_prompt_template().await;
465 })
466 .await;
467 Ok(())
468 }
469
470 async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
471 self.with_writer(async |runtime: &mut LashRuntime| {
472 runtime.add_prompt_contribution(contribution).await;
473 })
474 .await;
475 Ok(())
476 }
477
478 async fn replace_prompt_slot(
479 &self,
480 slot: PromptSlot,
481 contributions: impl IntoIterator<Item = PromptContribution>,
482 ) -> Result<()> {
483 self.with_writer(async |runtime: &mut LashRuntime| {
484 runtime.replace_prompt_slot(slot, contributions).await;
485 })
486 .await;
487 Ok(())
488 }
489
490 async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
491 self.with_writer(async |runtime: &mut LashRuntime| {
492 runtime.clear_prompt_slot(slot).await;
493 })
494 .await;
495 Ok(())
496 }
497
498 async fn apply_protocol_session_extension(
499 &self,
500 extension: lash_core::ProtocolSessionExtensionHandle,
501 ) -> Result<()> {
502 self.with_writer(async |runtime: &mut LashRuntime| {
503 runtime
504 .apply_protocol_session_extension(extension)
505 .await
506 .map_err(Into::into)
507 })
508 .await
509 }
510
511 async fn branch_to_node(
512 &self,
513 target_leaf: Option<String>,
514 ) -> Result<lash_core::SessionSnapshot> {
515 self.with_writer(async |runtime: &mut LashRuntime| {
516 runtime
517 .branch_to_node(target_leaf)
518 .await
519 .map_err(Into::into)
520 })
521 .await
522 }
523
524 async fn await_background_work(&self) -> Result<()> {
525 self.with_writer(async |runtime: &mut LashRuntime| {
526 runtime.await_background_work().await.map_err(Into::into)
527 })
528 .await
529 }
530
531 async fn refresh_tool_catalog(&self) -> Result<()> {
532 self.with_writer(async |runtime: &mut LashRuntime| {
533 runtime
534 .refresh_session_tool_catalog()
535 .await
536 .map_err(Into::into)
537 })
538 .await
539 }
540
541 async fn submit_session_command(
542 &self,
543 command: lash_core::SessionCommand,
544 idempotency_key: impl Into<String>,
545 ) -> Result<lash_core::SessionCommandReceipt> {
546 let idempotency_key = idempotency_key.into();
547 self.with_writer(async |runtime: &mut LashRuntime| {
548 runtime
549 .submit_session_command(command, idempotency_key)
550 .await
551 .map_err(Into::into)
552 })
553 .await
554 }
555
556 async fn list_lashlang_trigger_registrations(
557 &self,
558 ) -> Result<Vec<lash_core::TriggerRegistration>> {
559 self.with_writer(async |runtime: &mut LashRuntime| {
560 runtime
561 .list_lashlang_trigger_registrations()
562 .await
563 .map_err(Into::into)
564 })
565 .await
566 }
567
568 async fn lashlang_trigger_registrations_by_source_type(
569 &self,
570 source_type: impl Into<lash_core::TriggerEventType>,
571 ) -> Result<Vec<lash_core::TriggerRegistration>> {
572 self.with_writer(async |runtime: &mut LashRuntime| {
573 runtime
574 .lashlang_trigger_registrations_by_source_type(source_type)
575 .await
576 .map_err(Into::into)
577 })
578 .await
579 }
580
581 async fn invoke_plugin_action(
582 &self,
583 name: &str,
584 args: serde_json::Value,
585 ) -> Result<ToolResult> {
586 let session_id = self.runtime.observe().session_id().to_string();
587 let writer = self.runtime.writer();
588 writer
589 .lock()
590 .await
591 .invoke_plugin_action(name, args, Some(session_id))
592 .await
593 .map_err(Into::into)
594 }
595
596 async fn call_plugin_action<Op: lash_core::PluginAction>(
597 &self,
598 args: Op::Args,
599 ) -> Result<Op::Output> {
600 let result = self
601 .invoke_plugin_action(
602 Op::NAME,
603 serde_json::to_value(args).map_err(|err| {
604 EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
605 "invalid {} args: {err}",
606 Op::NAME
607 )))
608 })?,
609 )
610 .await?;
611 let Some(output) = result.as_done_output() else {
612 return Err(EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
613 "{} returned a pending result where completed output is required",
614 Op::NAME
615 ))));
616 };
617 if !output.is_success() {
618 return Err(EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
619 "{} failed: {}",
620 Op::NAME,
621 output.value_for_projection()
622 ))));
623 }
624 serde_json::from_value(output.value_for_projection()).map_err(|err| {
625 EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
626 "invalid {} output: {err}",
627 Op::NAME
628 )))
629 })
630 }
631
632 async fn compact_context(
633 &self,
634 instructions: Option<String>,
635 scoped_effect_controller: ScopedEffectController<'_>,
636 ) -> Result<bool> {
637 self.with_writer(async |runtime: &mut LashRuntime| {
638 runtime
639 .compact_context(instructions, scoped_effect_controller)
640 .await
641 .map_err(Into::into)
642 })
643 .await
644 }
645
646 async fn persist_current_state(&self) -> Result<RuntimeSessionState> {
647 self.with_writer(async |runtime: &mut LashRuntime| {
648 runtime.await_background_work().await?;
649 Ok(runtime.export_persisted_state())
650 })
651 .await
652 }
653
654 async fn list_process_handles(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
655 Ok(self.runtime.observe().list_process_handles().await)
656 }
657
658 async fn list_all_process_handles(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
659 Ok(self.runtime.observe().list_all_process_handles().await)
660 }
661
662 async fn start_process(
663 &self,
664 request: lash_core::ProcessStartRequest,
665 scoped_effect_controller: ScopedEffectController<'_>,
666 ) -> Result<lash_core::ProcessHandleSummary> {
667 let writer = self.runtime.writer();
668 let runtime = writer.lock().await;
669 let session_id = runtime.session_id().to_string();
670 let processes = runtime.process_service()?;
671 let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
672 let summary = processes
673 .start_from_request(&session_id, request, scope)
674 .await
675 .map_err(EmbedError::Plugin)?;
676 self.runtime.record_process_changed(
677 SessionProcessEventKind::Started,
678 vec![summary.process_id.clone()],
679 );
680 Ok(summary)
681 }
682
683 async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
684 self.runtime
685 .writer()
686 .lock()
687 .await
688 .session_state_service()
689 .map_err(Into::into)
690 }
691
692 async fn cancel_process(
693 &self,
694 process_id: &str,
695 scoped_effect_controller: ScopedEffectController<'_>,
696 ) -> Result<lash_core::ProcessCancelSummary> {
697 let writer = self.runtime.writer();
698 let runtime = writer.lock().await;
699 let session_id = runtime.session_id().to_string();
700 let processes = runtime.process_service()?;
701 let cancel_ability = runtime.process_cancel_ability();
702 let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
703 let summary = cancel_ability
704 .cancel_summary(
705 processes.as_ref(),
706 lash_core::ProcessCancelRequest::new(
707 &session_id,
708 process_id,
709 scope,
710 lash_core::ProcessCancelSource::HostApi,
711 )
712 .with_reason("requested by host API"),
713 )
714 .await
715 .map_err(EmbedError::Plugin)?;
716 self.runtime.record_process_changed(
717 SessionProcessEventKind::Cancelled,
718 vec![summary.process_id.clone()],
719 );
720 Ok(summary)
721 }
722
723 async fn cancel_visible_processes(
724 &self,
725 scoped_effect_controller: ScopedEffectController<'_>,
726 ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
727 let writer = self.runtime.writer();
728 let runtime = writer.lock().await;
729 let session_id = runtime.session_id().to_string();
730 let processes = runtime.process_service()?;
731 let cancel_ability = runtime.process_cancel_ability();
732 let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
733 let summaries = cancel_ability
734 .cancel_all_visible(
735 processes.as_ref(),
736 lash_core::ProcessCancelAllRequest::new(
737 &session_id,
738 scope,
739 lash_core::ProcessCancelSource::HostApi,
740 )
741 .with_reason("requested by host API"),
742 )
743 .await
744 .map_err(EmbedError::Plugin)?;
745 self.runtime.record_process_changed(
746 SessionProcessEventKind::Cancelled,
747 summaries
748 .iter()
749 .map(|summary| summary.process_id.clone())
750 .collect(),
751 );
752 Ok(summaries)
753 }
754
755 async fn snapshot_execution_state(&self) -> Result<Option<Vec<u8>>> {
756 self.with_writer(async |runtime: &mut LashRuntime| {
757 runtime.snapshot_execution_state().await.map_err(Into::into)
758 })
759 .await
760 }
761
762 async fn restore_execution_state(&self, bytes: &[u8]) -> Result<()> {
763 self.with_writer(async |runtime: &mut LashRuntime| {
764 runtime
765 .restore_execution_state(bytes)
766 .await
767 .map_err(Into::into)
768 })
769 .await
770 }
771
772 async fn tool_state(&self) -> Result<ToolState> {
773 self.runtime.observe().tool_state.clone().ok_or_else(|| {
774 EmbedError::Session(SessionError::Protocol(
775 "runtime session not available".to_string(),
776 ))
777 })
778 }
779
780 async fn apply_tool_state(&self, state: ToolState) -> Result<u64> {
781 self.with_writer(async |runtime: &mut LashRuntime| {
782 runtime
783 .apply_tool_state(state)
784 .await
785 .map_err(EmbedError::from)
786 })
787 .await
788 }
789
790 async fn restore_tool_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
791 self.with_writer(async |runtime: &mut LashRuntime| {
792 runtime
793 .restore_tool_state(state)
794 .await
795 .map_err(EmbedError::from)
796 })
797 .await
798 }
799
800 async fn set_tool_availability(
801 &self,
802 name: &str,
803 availability: ToolAvailability,
804 ) -> Result<u64> {
805 self.set_tool_availability_many(&[(name, availability)])
806 .await
807 }
808
809 async fn set_tool_availability_many<N: AsRef<str>>(
810 &self,
811 updates: &[(N, ToolAvailability)],
812 ) -> Result<u64> {
813 let mut state = self.tool_state().await?;
814 for (name, availability) in updates {
815 state
816 .set_availability(name.as_ref(), Some(*availability))
817 .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
818 }
819 self.apply_tool_state(state).await
820 }
821
822 async fn clear_tool_availability_override(&self, name: &str) -> Result<u64> {
823 let mut state = self.tool_state().await?;
824 state
825 .set_availability(name, None)
826 .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
827 self.apply_tool_state(state).await
828 }
829
830 async fn active_tool_manifests(&self) -> Result<Vec<ToolManifest>> {
831 Ok(self.tool_state().await?.tool_manifests())
832 }
833
834 async fn add_tool_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
835 let tool_registry = self.tool_registry().await?;
836 let handle = tool_registry
837 .add_tool_provider(provider)
838 .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
839 self.refresh_tool_catalog().await?;
840 Ok(handle)
841 }
842
843 async fn remove_tool_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
844 let tool_registry = self.tool_registry().await?;
845 let generation = tool_registry
846 .remove_source(handle)
847 .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
848 self.refresh_tool_catalog().await?;
849 Ok(generation)
850 }
851
852 async fn create_child_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
853 let writer = self.runtime.writer();
854 let runtime = writer.lock().await;
855 let lifecycle = runtime.session_lifecycle_service()?;
856 lifecycle.create_session(request).await.map_err(Into::into)
857 }
858
859 async fn close_child_session(&self, session_id: &str) -> Result<()> {
860 let writer = self.runtime.writer();
861 let runtime = writer.lock().await;
862 let lifecycle = runtime.session_lifecycle_service()?;
863 lifecycle
864 .close_session(session_id)
865 .await
866 .map_err(Into::into)
867 }
868
869 async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
870 self.with_writer(async |runtime: &mut LashRuntime| {
871 runtime
872 .activate_managed_session(session_id)
873 .await
874 .map_err(Into::into)
875 })
876 .await
877 }
878
879 async fn inject_turn_input(&self, id: Option<String>, message: PluginMessage) -> Result<()> {
880 self.inject_turn_inputs(vec![lash_core::InjectedTurnInput { id, message }])
881 .await
882 }
883
884 async fn inject_turn_inputs(&self, messages: Vec<lash_core::InjectedTurnInput>) -> Result<()> {
885 for input in messages {
886 let source_key = input.id.map(|id| format!("injection:{id}"));
887 let turn_input = turn_input_from_plugin_message(input.message);
888 self.runtime
889 .enqueue_turn_input(
890 turn_input,
891 lash_core::DeliveryPolicy::EarliestSafeBoundary,
892 lash_core::SlotPolicy::Join,
893 source_key,
894 )
895 .await
896 .map(|_| ())
897 .map_err(EmbedError::Runtime)?;
898 }
899 Ok(())
900 }
901
902 async fn tool_registry(&self) -> Result<Arc<lash_core::ToolRegistry>> {
903 self.runtime
904 .writer()
905 .lock()
906 .await
907 .plugin_session()
908 .map(|session| session.tool_registry())
909 .ok_or_else(|| {
910 EmbedError::Session(SessionError::Protocol(
911 "tool registry is unavailable in this runtime session".to_string(),
912 ))
913 })
914 }
915}
916
917fn turn_input_from_plugin_message(message: PluginMessage) -> TurnInput {
918 let mut input = TurnInput::empty();
919 if !message.content.is_empty() {
920 input.items.push(InputItem::Text {
921 text: message.content,
922 });
923 }
924 for (index, bytes) in message.images.into_iter().enumerate() {
925 let id = format!("injected-image-{index}");
926 input.items.push(InputItem::ImageRef { id: id.clone() });
927 input.image_blobs.insert(id, bytes);
928 }
929 input
930}
931
932#[derive(Clone)]
933pub struct SessionConfigAdmin {
934 control: SessionAdmin,
935}
936
937impl SessionConfigAdmin {
938 pub async fn update(&self, patch: SessionConfigPatch) -> Result<()> {
939 self.control.update_config(patch).await
940 }
941
942 pub async fn update_session_config(
943 &self,
944 provider: Option<ProviderHandle>,
945 model: Option<lash_core::ModelSpec>,
946 prompt: Option<PromptLayer>,
947 ) -> Result<()> {
948 self.control
949 .update_session_config(provider, model, prompt)
950 .await
951 }
952
953 pub async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
954 self.control.set_prompt_template(template).await
955 }
956
957 pub async fn clear_prompt_template(&self) -> Result<()> {
958 self.control.clear_prompt_template().await
959 }
960
961 pub async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
962 self.control.add_prompt_contribution(contribution).await
963 }
964
965 pub async fn replace_prompt_slot(
966 &self,
967 slot: PromptSlot,
968 contributions: impl IntoIterator<Item = PromptContribution>,
969 ) -> Result<()> {
970 self.control.replace_prompt_slot(slot, contributions).await
971 }
972
973 pub async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
974 self.control.clear_prompt_slot(slot).await
975 }
976}
977
978#[derive(Clone)]
979pub struct ToolAdmin {
980 control: SessionAdmin,
981}
982
983impl ToolAdmin {
984 pub(crate) fn new(control: SessionAdmin) -> Self {
985 Self { control }
986 }
987}
988
989impl ToolAdmin {
990 pub async fn state(&self) -> Result<ToolState> {
991 self.control.tool_state().await
992 }
993
994 pub fn advanced(&self) -> AdvancedToolAdmin {
995 AdvancedToolAdmin {
996 control: self.control.clone(),
997 }
998 }
999
1000 pub async fn set_availability(
1001 &self,
1002 name: impl AsRef<str>,
1003 availability: ToolAvailability,
1004 ) -> Result<u64> {
1005 self.control
1006 .set_tool_availability(name.as_ref(), availability)
1007 .await
1008 }
1009
1010 pub async fn set_availability_many<N: AsRef<str>>(
1011 &self,
1012 updates: &[(N, ToolAvailability)],
1013 ) -> Result<u64> {
1014 self.control.set_tool_availability_many(updates).await
1015 }
1016
1017 pub async fn clear_availability_override(&self, name: impl AsRef<str>) -> Result<u64> {
1018 self.control
1019 .clear_tool_availability_override(name.as_ref())
1020 .await
1021 }
1022
1023 pub async fn active_manifests(&self) -> Result<Vec<ToolManifest>> {
1024 self.control.active_tool_manifests().await
1025 }
1026
1027 pub async fn add_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
1028 self.control.add_tool_provider(provider).await
1029 }
1030
1031 pub async fn remove_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
1032 self.control.remove_tool_source(handle).await
1033 }
1034}
1035
1036#[derive(Clone)]
1037pub struct AdvancedToolAdmin {
1038 control: SessionAdmin,
1039}
1040
1041impl AdvancedToolAdmin {
1042 pub async fn apply_state(&self, state: ToolState) -> Result<u64> {
1048 self.control.apply_tool_state(state).await
1049 }
1050
1051 pub async fn restore_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
1064 self.control.restore_tool_state(state).await
1065 }
1066}
1067
1068#[derive(Clone)]
1069pub struct SessionCommandAdmin {
1070 control: SessionAdmin,
1071}
1072
1073impl SessionCommandAdmin {
1074 pub async fn refresh_tool_catalog(
1079 &self,
1080 reason: impl Into<String>,
1081 idempotency_key: impl Into<String>,
1082 ) -> Result<lash_core::SessionCommandReceipt> {
1083 self.control
1084 .submit_session_command(
1085 lash_core::SessionCommand::RefreshToolCatalog {
1086 reason: reason.into(),
1087 },
1088 idempotency_key,
1089 )
1090 .await
1091 }
1092
1093 pub async fn reset(
1094 &self,
1095 reason: impl Into<String>,
1096 idempotency_key: impl Into<String>,
1097 ) -> Result<lash_core::SessionCommandReceipt> {
1098 self.control
1099 .submit_session_command(
1100 lash_core::SessionCommand::ResetSession {
1101 reason: reason.into(),
1102 },
1103 idempotency_key,
1104 )
1105 .await
1106 }
1107}
1108
1109#[derive(Clone)]
1111pub struct SessionTriggerAdmin {
1112 control: SessionAdmin,
1113}
1114
1115impl SessionTriggerAdmin {
1116 pub async fn list_all(&self) -> Result<Vec<lash_core::TriggerRegistration>> {
1122 self.control.list_lashlang_trigger_registrations().await
1123 }
1124
1125 pub async fn by_source_type(
1130 &self,
1131 source_type: impl Into<lash_core::TriggerEventType>,
1132 ) -> Result<Vec<lash_core::TriggerRegistration>> {
1133 self.control
1134 .lashlang_trigger_registrations_by_source_type(source_type)
1135 .await
1136 }
1137}
1138
1139#[derive(Clone)]
1140pub struct SessionProcessAdmin {
1141 control: SessionAdmin,
1142}
1143
1144impl SessionProcessAdmin {
1145 pub(crate) fn new(control: SessionAdmin) -> Self {
1146 Self { control }
1147 }
1148
1149 pub async fn start(
1150 &self,
1151 request: lash_core::ProcessStartRequest,
1152 scoped_effect_controller: ScopedEffectController<'_>,
1153 ) -> Result<lash_core::ProcessHandleSummary> {
1154 self.control
1155 .start_process(request, scoped_effect_controller)
1156 .await
1157 }
1158
1159 pub async fn list(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
1160 self.control.list_process_handles().await
1161 }
1162
1163 pub async fn list_all(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
1164 self.control.list_all_process_handles().await
1165 }
1166
1167 pub async fn await_all(&self) -> Result<()> {
1168 self.control.await_background_work().await
1169 }
1170
1171 pub async fn cancel(
1172 &self,
1173 process_id: &str,
1174 scoped_effect_controller: ScopedEffectController<'_>,
1175 ) -> Result<lash_core::ProcessCancelSummary> {
1176 self.control
1177 .cancel_process(process_id, scoped_effect_controller)
1178 .await
1179 }
1180
1181 pub async fn cancel_all(
1182 &self,
1183 scoped_effect_controller: ScopedEffectController<'_>,
1184 ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
1185 self.control
1186 .cancel_visible_processes(scoped_effect_controller)
1187 .await
1188 }
1189}
1190
1191#[derive(Clone)]
1192pub struct SessionStateAdmin {
1193 control: SessionAdmin,
1194}
1195
1196impl SessionStateAdmin {
1197 pub async fn export(&self) -> lash_core::SessionSnapshot {
1198 self.control.export_state().await
1199 }
1200
1201 pub async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
1202 self.control.append_messages(messages).await
1203 }
1204
1205 pub async fn append_plugin_body(
1206 &self,
1207 plugin_type: impl Into<String>,
1208 body: serde_json::Value,
1209 ) -> Result<()> {
1210 self.control.append_plugin_body(plugin_type, body).await
1211 }
1212
1213 pub async fn set_persisted(&self, state: RuntimeSessionState) -> Result<()> {
1214 self.control.set_persisted_state(state).await
1215 }
1216
1217 pub async fn branch_to_node(
1218 &self,
1219 target_leaf: Option<String>,
1220 ) -> Result<lash_core::SessionSnapshot> {
1221 self.control.branch_to_node(target_leaf).await
1222 }
1223
1224 pub async fn persist_current(&self) -> Result<RuntimeSessionState> {
1225 self.control.persist_current_state().await
1226 }
1227
1228 pub async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
1229 self.control.session_state_service().await
1230 }
1231
1232 pub async fn snapshot_execution(&self) -> Result<Option<Vec<u8>>> {
1233 self.control.snapshot_execution_state().await
1234 }
1235
1236 pub async fn restore_execution(&self, bytes: &[u8]) -> Result<()> {
1237 self.control.restore_execution_state(bytes).await
1238 }
1239
1240 pub async fn compact_context(
1241 &self,
1242 instructions: Option<String>,
1243 scoped_effect_controller: ScopedEffectController<'_>,
1244 ) -> Result<bool> {
1245 self.control
1246 .compact_context(instructions, scoped_effect_controller)
1247 .await
1248 }
1249}
1250
1251#[derive(Clone)]
1252pub struct PluginActions {
1253 pub(crate) control: SessionAdmin,
1254}
1255
1256impl PluginActions {
1257 pub async fn call<Op: lash_core::PluginAction>(&self, args: Op::Args) -> Result<Op::Output> {
1258 self.control.call_plugin_action::<Op>(args).await
1259 }
1260}
1261
1262#[derive(Clone)]
1263pub struct ChildSessionAdmin {
1264 control: SessionAdmin,
1265}
1266
1267impl ChildSessionAdmin {
1268 pub async fn create_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
1269 self.control.create_child_session(request).await
1270 }
1271
1272 pub async fn close_session(&self, session_id: &str) -> Result<()> {
1273 self.control.close_child_session(session_id).await
1274 }
1275
1276 pub async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
1277 self.control.activate_managed_session(session_id).await
1278 }
1279}
1280
1281#[derive(Clone)]
1282pub struct InjectionAdmin {
1283 control: SessionAdmin,
1284}
1285
1286impl InjectionAdmin {
1287 pub async fn inject_turn_input(
1288 &self,
1289 id: Option<String>,
1290 message: PluginMessage,
1291 ) -> Result<()> {
1292 self.control.inject_turn_input(id, message).await
1293 }
1294
1295 pub async fn inject_turn_inputs(
1296 &self,
1297 messages: Vec<lash_core::InjectedTurnInput>,
1298 ) -> Result<()> {
1299 self.control.inject_turn_inputs(messages).await
1300 }
1301}
1302
1303#[derive(Clone)]
1304pub struct ModeAdmin {
1305 control: SessionAdmin,
1306}
1307
1308impl ModeAdmin {
1309 pub async fn apply_session_extension(
1310 &self,
1311 extension: lash_core::ProtocolSessionExtensionHandle,
1312 ) -> Result<()> {
1313 self.control
1314 .apply_protocol_session_extension(extension)
1315 .await
1316 }
1317}