1pub use crate::session::SessionConfigPatch;
2use crate::support::*;
3pub use lash_core::{AcceptedInjectedTurnInput, PluginAction};
4
5#[derive(Clone)]
6pub struct Completions {
7 pub(crate) core: LashCore,
8}
9
10impl Completions {
11 pub async fn resolve(
12 &self,
13 key: lash_core::AwaitEventKey,
14 resolution: lash_core::Resolution,
15 ) -> Result<lash_core::ResolveOutcome> {
16 self.core
17 .env
18 .core
19 .control
20 .effect_host
21 .resolve_await_event(&key, resolution)
22 .await
23 .map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))
24 }
25}
26
27#[derive(Clone)]
28pub struct CoreTriggerAdmin {
29 pub(crate) core: LashCore,
30}
31
32impl CoreTriggerAdmin {
33 pub async fn emit(
34 &self,
35 request: lash_core::TriggerOccurrenceRequest,
36 scoped_effect_controller: ScopedEffectController<'_>,
37 ) -> Result<lash_core::TriggerEmitReport> {
38 let store = self.core.env.trigger_store.as_ref().ok_or_else(|| {
39 EmbedError::Plugin(lash_core::PluginError::Session(
40 "trigger store is unavailable in this runtime".to_string(),
41 ))
42 })?;
43 let process_work_poke = self.core.process_work_runner.poke().await;
44 let router = lash_core::TriggerRouter::new(
45 Arc::clone(store),
46 Arc::clone(&self.core.env.core.durability.lashlang_artifact_store),
47 self.core.env.process_registry.clone(),
48 process_work_poke,
49 self.core.env.core.profile.host_profile_id.clone(),
50 );
51 router
52 .emit(request, scoped_effect_controller.controller())
53 .await
54 .map_err(Into::into)
55 }
56
57 pub async fn subscriptions(
58 &self,
59 filter: lash_core::TriggerSubscriptionFilter,
60 ) -> Result<Vec<lash_core::TriggerRegistration>> {
61 let store = self.core.env.trigger_store.as_ref().ok_or_else(|| {
62 EmbedError::Plugin(lash_core::PluginError::Session(
63 "trigger store is unavailable in this runtime".to_string(),
64 ))
65 })?;
66 let records = store.list_subscriptions(filter).await?;
67 Ok(records
68 .iter()
69 .map(lash_core::TriggerRegistration::from)
70 .collect())
71 }
72}
73
74#[derive(Clone)]
75pub struct Processes {
76 pub(crate) core: LashCore,
77}
78
79impl Processes {
80 fn registry(&self) -> Result<Arc<dyn lash_core::ProcessRegistry>> {
81 self.core
82 .env
83 .process_registry
84 .as_ref()
85 .cloned()
86 .ok_or_else(|| {
87 EmbedError::Plugin(lash_core::PluginError::Session(
88 "process registry is unavailable in this runtime".to_string(),
89 ))
90 })
91 }
92
93 fn make_observer(&self) -> Result<lash_core::ProcessWorkObserver> {
94 Ok(lash_core::ProcessWorkObserver::new(self.registry()?))
95 }
96
97 fn process_invocation(command: &lash_core::ProcessCommand) -> lash_core::RuntimeInvocation {
98 let effect_id = command.effect_id();
99 lash_core::RuntimeInvocation::effect(
100 lash_core::runtime::RuntimeScope::new("runtime"),
101 effect_id.clone(),
102 lash_core::RuntimeEffectKind::Process,
103 effect_id,
104 )
105 }
106
107 async fn run_command(
108 &self,
109 command: lash_core::ProcessCommand,
110 scoped_effect_controller: ScopedEffectController<'_>,
111 ) -> Result<lash_core::ProcessEffectOutcome> {
112 let registry = self.registry()?;
113 let invocation = Self::process_invocation(&command);
114 let outcome = scoped_effect_controller
115 .controller()
116 .execute_effect(
117 lash_core::RuntimeEffectEnvelope::new(
118 invocation,
119 lash_core::RuntimeEffectCommand::process(command),
120 ),
121 lash_core::RuntimeEffectLocalExecutor::processes(registry),
122 )
123 .await
124 .map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))?;
125 match outcome {
126 lash_core::RuntimeEffectOutcome::Process { result } => Ok(result),
127 _ => Err(EmbedError::Plugin(lash_core::PluginError::Session(
128 "process effect returned non-process outcome".to_string(),
129 ))),
130 }
131 }
132
133 pub async fn start(
134 &self,
135 request: lash_core::ProcessStartRequest,
136 scoped_effect_controller: ScopedEffectController<'_>,
137 ) -> Result<lash_core::ProcessRecord> {
138 let env_ref = match request.env_spec.as_ref() {
139 Some(env_spec) => Some(
140 lash_core::runtime::persist_process_execution_env(
141 self.core
142 .env
143 .core
144 .durability
145 .lashlang_artifact_store
146 .as_ref(),
147 env_spec,
148 )
149 .await?,
150 ),
151 None => None,
152 };
153 let grant = request.grant.clone();
154 let registration =
155 request.into_registration(self.core.env.core.profile.host_profile_id.clone(), env_ref);
156 let command = lash_core::ProcessCommand::Start {
157 registration,
158 grant,
159 execution_context: Box::new(lash_core::ProcessExecutionContext::default()),
160 };
161 let outcome = self
162 .run_command(command, scoped_effect_controller.clone())
163 .await?;
164 let lash_core::ProcessEffectOutcome::Start { record } = outcome else {
165 return Err(EmbedError::Plugin(lash_core::PluginError::Session(
166 "process start returned the wrong outcome".to_string(),
167 )));
168 };
169 if let Some(poke) = self.core.process_work_runner.poke().await {
170 poke.poke();
171 }
172 Ok(record)
173 }
174
175 pub async fn list(
176 &self,
177 filter: &lash_core::ProcessListFilter,
178 ) -> Result<Vec<lash_core::ObservedProcess>> {
179 self.make_observer()?.list(filter).await.map_err(Into::into)
180 }
181
182 pub async fn get(&self, process_id: &str) -> Result<Option<lash_core::ObservedProcess>> {
183 Ok(self.make_observer()?.process(process_id).await)
184 }
185
186 pub async fn events(
187 &self,
188 process_id: &str,
189 after_sequence: u64,
190 ) -> Result<Vec<lash_core::ObservedProcessEvent>> {
191 self.make_observer()?
192 .events_after(process_id, after_sequence)
193 .await
194 .map_err(Into::into)
195 }
196
197 pub async fn await_output(&self, process_id: &str) -> Result<lash_core::ProcessAwaitOutput> {
198 self.registry()?
199 .await_process(process_id)
200 .await
201 .map_err(Into::into)
202 }
203
204 pub async fn cancel(
205 &self,
206 process_id: &str,
207 scoped_effect_controller: ScopedEffectController<'_>,
208 ) -> Result<lash_core::ProcessCancelSummary> {
209 let command = lash_core::ProcessCommand::Cancel {
210 process_id: process_id.to_string(),
211 reason: Some("requested by host".to_string()),
212 };
213 let outcome = self
214 .run_command(command, scoped_effect_controller.clone())
215 .await?;
216 let lash_core::ProcessEffectOutcome::Cancel { record } = outcome else {
217 return Err(EmbedError::Plugin(lash_core::PluginError::Session(
218 "process cancel returned the wrong outcome".to_string(),
219 )));
220 };
221 Ok(lash_core::ProcessCancelSummary::from_record(record))
222 }
223
224 pub async fn signal(
225 &self,
226 process_id: &str,
227 signal_name: impl Into<String>,
228 signal_id: impl Into<String>,
229 request: lash_core::ProcessEventAppendRequest,
230 scoped_effect_controller: ScopedEffectController<'_>,
231 ) -> Result<lash_core::ProcessEvent> {
232 let signal_name = signal_name.into();
233 let event_type = request.event_type.clone();
234 let payload = request.payload.clone();
235 let command = lash_core::ProcessCommand::Signal {
236 process_id: process_id.to_string(),
237 signal_name: signal_name.clone(),
238 signal_id: signal_id.into(),
239 request,
240 };
241 let outcome = self
242 .run_command(command, scoped_effect_controller.clone())
243 .await?;
244 let lash_core::ProcessEffectOutcome::Signal { event } = outcome else {
245 return Err(EmbedError::Plugin(lash_core::PluginError::Session(
246 "process signal returned the wrong outcome".to_string(),
247 )));
248 };
249 let registry = self.registry()?;
250 let waiting_ordinal =
251 registry
252 .get_process(process_id)
253 .await
254 .and_then(|record| match record.wait {
255 Some(lash_core::WaitState {
256 kind:
257 lash_core::WaitKind::Signal {
258 name,
259 event_type: wait_event_type,
260 ordinal,
261 ..
262 },
263 ..
264 }) if name == signal_name && wait_event_type == event_type => Some(ordinal),
265 _ => None,
266 });
267 let ordinal = match waiting_ordinal {
268 Some(ordinal) => ordinal,
269 None => {
270 registry
271 .count_events_through(process_id, &event_type, event.sequence)
272 .await?
273 }
274 };
275 if ordinal > 0 {
276 let key = scoped_effect_controller
277 .controller()
278 .await_event_key(
279 &lash_core::ExecutionScope::process(process_id),
280 lash_core::AwaitEventWaitIdentity::process_signal(
281 process_id,
282 &signal_name,
283 ordinal,
284 ),
285 )
286 .await
287 .map_err(|err| {
288 EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
289 })?;
290 let _ = scoped_effect_controller
291 .controller()
292 .resolve_await_event(&key, lash_core::Resolution::Ok(payload))
293 .await
294 .map_err(|err| {
295 EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
296 })?;
297 }
298 Ok(event)
299 }
300
301 pub async fn session_snapshot(
302 &self,
303 session_id: impl Into<String>,
304 ) -> Result<lash_core::ProcessWorkSnapshot> {
305 self.make_observer()?
306 .snapshot_for_session(session_id)
307 .await
308 .map_err(Into::into)
309 }
310
311 pub fn observer(&self) -> Result<lash_core::ProcessWorkObserver> {
312 self.make_observer()
313 }
314}
315
316#[derive(Clone)]
317pub struct SessionAdmin {
318 pub(crate) runtime: RuntimeHandle,
319}
320
321impl SessionAdmin {
322 pub fn config(&self) -> SessionConfigAdmin {
323 SessionConfigAdmin {
324 control: self.clone(),
325 }
326 }
327
328 pub fn tools(&self) -> ToolAdmin {
329 ToolAdmin {
330 control: self.clone(),
331 }
332 }
333
334 pub fn commands(&self) -> SessionCommandAdmin {
335 SessionCommandAdmin {
336 control: self.clone(),
337 }
338 }
339
340 pub fn triggers(&self) -> SessionTriggerAdmin {
341 SessionTriggerAdmin {
342 control: self.clone(),
343 }
344 }
345
346 pub fn state(&self) -> SessionStateAdmin {
347 SessionStateAdmin {
348 control: self.clone(),
349 }
350 }
351
352 pub fn children(&self) -> ChildSessionAdmin {
353 ChildSessionAdmin {
354 control: self.clone(),
355 }
356 }
357
358 pub fn injection(&self) -> InjectionAdmin {
359 InjectionAdmin {
360 control: self.clone(),
361 }
362 }
363
364 pub fn mode(&self) -> ModeAdmin {
365 ModeAdmin {
366 control: self.clone(),
367 }
368 }
369
370 pub fn processes(&self) -> SessionProcessAdmin {
371 SessionProcessAdmin {
372 control: self.clone(),
373 }
374 }
375
376 async fn with_writer<F, T>(&self, f: F) -> T
381 where
382 F: AsyncFnOnce(&mut LashRuntime) -> T,
383 {
384 let writer = self.runtime.writer();
385 let mut runtime = writer.lock().await;
386 let value = f(&mut runtime).await;
387 self.runtime.publish_from(&runtime);
388 value
389 }
390
391 async fn update_config(&self, patch: SessionConfigPatch) -> Result<()> {
392 self.update_session_config(patch.provider, patch.model, patch.prompt)
393 .await?;
394 Ok(())
395 }
396
397 async fn update_session_config(
398 &self,
399 provider: Option<ProviderHandle>,
400 model: Option<lash_core::ModelSpec>,
401 prompt: Option<PromptLayer>,
402 ) -> Result<()> {
403 self.with_writer(async |runtime: &mut LashRuntime| {
404 runtime.update_session_config(provider, model, prompt).await;
405 })
406 .await;
407 Ok(())
408 }
409
410 async fn export_state(&self) -> lash_core::SessionSnapshot {
411 self.runtime.observe().read_view.to_snapshot()
412 }
413
414 async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
415 self.with_writer(async |runtime: &mut LashRuntime| {
416 runtime
417 .append_session_nodes(lash_core::AppendSessionNodesRequest {
418 nodes: messages
419 .into_iter()
420 .map(lash_core::SessionAppendNode::message)
421 .collect(),
422 requires_ancestor_node_id: None,
423 })
424 .await
425 .map(|_| ())
426 .map_err(Into::into)
427 })
428 .await
429 }
430
431 async fn append_plugin_body(
432 &self,
433 plugin_type: impl Into<String>,
434 body: serde_json::Value,
435 ) -> Result<()> {
436 self.with_writer(async |runtime: &mut LashRuntime| {
437 runtime
438 .append_session_nodes(lash_core::AppendSessionNodesRequest {
439 nodes: vec![lash_core::SessionAppendNode::plugin(plugin_type, body)],
440 requires_ancestor_node_id: None,
441 })
442 .await
443 .map(|_| ())
444 .map_err(Into::into)
445 })
446 .await
447 }
448
449 async fn set_persisted_state(&self, state: RuntimeSessionState) -> Result<()> {
450 self.with_writer(async |runtime: &mut LashRuntime| {
451 runtime.set_persisted_state(state).map_err(Into::into)
452 })
453 .await
454 }
455
456 async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
457 self.with_writer(async |runtime: &mut LashRuntime| {
458 runtime.set_prompt_template(template).await;
459 })
460 .await;
461 Ok(())
462 }
463
464 async fn clear_prompt_template(&self) -> Result<()> {
465 self.with_writer(async |runtime: &mut LashRuntime| {
466 runtime.clear_prompt_template().await;
467 })
468 .await;
469 Ok(())
470 }
471
472 async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
473 self.with_writer(async |runtime: &mut LashRuntime| {
474 runtime.add_prompt_contribution(contribution).await;
475 })
476 .await;
477 Ok(())
478 }
479
480 async fn replace_prompt_slot(
481 &self,
482 slot: PromptSlot,
483 contributions: impl IntoIterator<Item = PromptContribution>,
484 ) -> Result<()> {
485 self.with_writer(async |runtime: &mut LashRuntime| {
486 runtime.replace_prompt_slot(slot, contributions).await;
487 })
488 .await;
489 Ok(())
490 }
491
492 async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
493 self.with_writer(async |runtime: &mut LashRuntime| {
494 runtime.clear_prompt_slot(slot).await;
495 })
496 .await;
497 Ok(())
498 }
499
500 async fn apply_protocol_session_extension(
501 &self,
502 extension: lash_core::ProtocolSessionExtensionHandle,
503 ) -> Result<()> {
504 self.with_writer(async |runtime: &mut LashRuntime| {
505 runtime
506 .apply_protocol_session_extension(extension)
507 .await
508 .map_err(Into::into)
509 })
510 .await
511 }
512
513 async fn branch_to_node(
514 &self,
515 target_leaf: Option<String>,
516 ) -> Result<lash_core::SessionSnapshot> {
517 self.with_writer(async |runtime: &mut LashRuntime| {
518 runtime
519 .branch_to_node(target_leaf)
520 .await
521 .map_err(Into::into)
522 })
523 .await
524 }
525
526 async fn await_background_work(&self) -> Result<()> {
527 self.with_writer(async |runtime: &mut LashRuntime| {
528 runtime.await_background_work().await.map_err(Into::into)
529 })
530 .await
531 }
532
533 async fn refresh_tool_catalog(&self) -> Result<()> {
534 self.with_writer(async |runtime: &mut LashRuntime| {
535 runtime
536 .refresh_session_tool_catalog()
537 .await
538 .map_err(Into::into)
539 })
540 .await
541 }
542
543 async fn submit_session_command(
544 &self,
545 command: lash_core::SessionCommand,
546 idempotency_key: impl Into<String>,
547 ) -> Result<lash_core::SessionCommandReceipt> {
548 let idempotency_key = idempotency_key.into();
549 self.with_writer(async |runtime: &mut LashRuntime| {
550 runtime
551 .submit_session_command(command, idempotency_key)
552 .await
553 .map_err(Into::into)
554 })
555 .await
556 }
557
558 async fn list_lashlang_trigger_registrations(
559 &self,
560 ) -> Result<Vec<lash_core::TriggerRegistration>> {
561 self.with_writer(async |runtime: &mut LashRuntime| {
562 runtime
563 .list_lashlang_trigger_registrations()
564 .await
565 .map_err(Into::into)
566 })
567 .await
568 }
569
570 async fn lashlang_trigger_registrations_by_source_type(
571 &self,
572 source_type: impl Into<lash_core::TriggerEventType>,
573 ) -> Result<Vec<lash_core::TriggerRegistration>> {
574 self.with_writer(async |runtime: &mut LashRuntime| {
575 runtime
576 .lashlang_trigger_registrations_by_source_type(source_type)
577 .await
578 .map_err(Into::into)
579 })
580 .await
581 }
582
583 async fn invoke_plugin_action(
584 &self,
585 name: &str,
586 args: serde_json::Value,
587 ) -> Result<ToolResult> {
588 let session_id = self.runtime.observe().session_id().to_string();
589 let writer = self.runtime.writer();
590 writer
591 .lock()
592 .await
593 .invoke_plugin_action(name, args, Some(session_id))
594 .await
595 .map_err(Into::into)
596 }
597
598 async fn call_plugin_action<Op: lash_core::PluginAction>(
599 &self,
600 args: Op::Args,
601 ) -> Result<Op::Output> {
602 let result = self
603 .invoke_plugin_action(
604 Op::NAME,
605 serde_json::to_value(args).map_err(|err| {
606 EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
607 "invalid {} args: {err}",
608 Op::NAME
609 )))
610 })?,
611 )
612 .await?;
613 let Some(output) = result.as_done_output() else {
614 return Err(EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
615 "{} returned a pending result where completed output is required",
616 Op::NAME
617 ))));
618 };
619 if !output.is_success() {
620 return Err(EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
621 "{} failed: {}",
622 Op::NAME,
623 output.value_for_projection()
624 ))));
625 }
626 serde_json::from_value(output.value_for_projection()).map_err(|err| {
627 EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
628 "invalid {} output: {err}",
629 Op::NAME
630 )))
631 })
632 }
633
634 async fn compact_context(
635 &self,
636 instructions: Option<String>,
637 scoped_effect_controller: ScopedEffectController<'_>,
638 ) -> Result<bool> {
639 self.with_writer(async |runtime: &mut LashRuntime| {
640 runtime
641 .compact_context(instructions, scoped_effect_controller)
642 .await
643 .map_err(Into::into)
644 })
645 .await
646 }
647
648 async fn persist_current_state(&self) -> Result<RuntimeSessionState> {
649 self.with_writer(async |runtime: &mut LashRuntime| {
650 runtime.await_background_work().await?;
651 Ok(runtime.export_persisted_state())
652 })
653 .await
654 }
655
656 async fn list_process_handles(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
657 Ok(self.runtime.observe().list_process_handles().await)
658 }
659
660 async fn list_all_process_handles(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
661 Ok(self.runtime.observe().list_all_process_handles().await)
662 }
663
664 async fn start_process(
665 &self,
666 request: lash_core::ProcessStartRequest,
667 scoped_effect_controller: ScopedEffectController<'_>,
668 ) -> Result<lash_core::ProcessHandleSummary> {
669 let writer = self.runtime.writer();
670 let runtime = writer.lock().await;
671 let session_id = runtime.session_id().to_string();
672 let processes = runtime.process_service()?;
673 let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
674 let summary = processes
675 .start_from_request(&session_id, request, scope)
676 .await
677 .map_err(EmbedError::Plugin)?;
678 self.runtime.record_process_changed(
679 SessionProcessEventKind::Started,
680 vec![summary.process_id.clone()],
681 );
682 Ok(summary)
683 }
684
685 async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
686 self.runtime
687 .writer()
688 .lock()
689 .await
690 .session_state_service()
691 .map_err(Into::into)
692 }
693
694 async fn cancel_process(
695 &self,
696 process_id: &str,
697 scoped_effect_controller: ScopedEffectController<'_>,
698 ) -> Result<lash_core::ProcessCancelSummary> {
699 let writer = self.runtime.writer();
700 let runtime = writer.lock().await;
701 let session_id = runtime.session_id().to_string();
702 let processes = runtime.process_service()?;
703 let cancel_ability = runtime.process_cancel_ability();
704 let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
705 let summary = cancel_ability
706 .cancel_summary(
707 processes.as_ref(),
708 lash_core::ProcessCancelRequest::new(
709 &session_id,
710 process_id,
711 scope,
712 lash_core::ProcessCancelSource::HostApi,
713 )
714 .with_reason("requested by host API"),
715 )
716 .await
717 .map_err(EmbedError::Plugin)?;
718 self.runtime.record_process_changed(
719 SessionProcessEventKind::Cancelled,
720 vec![summary.process_id.clone()],
721 );
722 Ok(summary)
723 }
724
725 async fn cancel_visible_processes(
726 &self,
727 scoped_effect_controller: ScopedEffectController<'_>,
728 ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
729 let writer = self.runtime.writer();
730 let runtime = writer.lock().await;
731 let session_id = runtime.session_id().to_string();
732 let processes = runtime.process_service()?;
733 let cancel_ability = runtime.process_cancel_ability();
734 let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
735 let summaries = cancel_ability
736 .cancel_all_visible(
737 processes.as_ref(),
738 lash_core::ProcessCancelAllRequest::new(
739 &session_id,
740 scope,
741 lash_core::ProcessCancelSource::HostApi,
742 )
743 .with_reason("requested by host API"),
744 )
745 .await
746 .map_err(EmbedError::Plugin)?;
747 self.runtime.record_process_changed(
748 SessionProcessEventKind::Cancelled,
749 summaries
750 .iter()
751 .map(|summary| summary.process_id.clone())
752 .collect(),
753 );
754 Ok(summaries)
755 }
756
757 async fn snapshot_execution_state(&self) -> Result<Option<Vec<u8>>> {
758 self.with_writer(async |runtime: &mut LashRuntime| {
759 runtime.snapshot_execution_state().await.map_err(Into::into)
760 })
761 .await
762 }
763
764 async fn restore_execution_state(&self, bytes: &[u8]) -> Result<()> {
765 self.with_writer(async |runtime: &mut LashRuntime| {
766 runtime
767 .restore_execution_state(bytes)
768 .await
769 .map_err(Into::into)
770 })
771 .await
772 }
773
774 async fn tool_state(&self) -> Result<ToolState> {
775 self.runtime.observe().tool_state.clone().ok_or_else(|| {
776 EmbedError::Session(SessionError::Protocol(
777 "runtime session not available".to_string(),
778 ))
779 })
780 }
781
782 async fn apply_tool_state(&self, state: ToolState) -> Result<u64> {
783 self.with_writer(async |runtime: &mut LashRuntime| {
784 runtime
785 .apply_tool_state(state)
786 .await
787 .map_err(EmbedError::from)
788 })
789 .await
790 }
791
792 async fn restore_tool_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
793 self.with_writer(async |runtime: &mut LashRuntime| {
794 runtime
795 .restore_tool_state(state)
796 .await
797 .map_err(EmbedError::from)
798 })
799 .await
800 }
801
802 async fn set_tool_availability(
803 &self,
804 name: &str,
805 availability: ToolAvailability,
806 ) -> Result<u64> {
807 self.set_tool_availability_many(&[(name, availability)])
808 .await
809 }
810
811 async fn set_tool_availability_many<N: AsRef<str>>(
812 &self,
813 updates: &[(N, ToolAvailability)],
814 ) -> Result<u64> {
815 let mut state = self.tool_state().await?;
816 for (name, availability) in updates {
817 state
818 .set_availability(name.as_ref(), Some(*availability))
819 .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
820 }
821 self.apply_tool_state(state).await
822 }
823
824 async fn clear_tool_availability_override(&self, name: &str) -> Result<u64> {
825 let mut state = self.tool_state().await?;
826 state
827 .set_availability(name, None)
828 .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
829 self.apply_tool_state(state).await
830 }
831
832 async fn active_tool_manifests(&self) -> Result<Vec<ToolManifest>> {
833 Ok(self.tool_state().await?.tool_manifests())
834 }
835
836 async fn add_tool_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
837 let tool_registry = self.tool_registry().await?;
838 let handle = tool_registry
839 .add_tool_provider(provider)
840 .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
841 self.refresh_tool_catalog().await?;
842 Ok(handle)
843 }
844
845 async fn remove_tool_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
846 let tool_registry = self.tool_registry().await?;
847 let generation = tool_registry
848 .remove_source(handle)
849 .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
850 self.refresh_tool_catalog().await?;
851 Ok(generation)
852 }
853
854 async fn create_child_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
855 let writer = self.runtime.writer();
856 let runtime = writer.lock().await;
857 let lifecycle = runtime.session_lifecycle_service()?;
858 lifecycle.create_session(request).await.map_err(Into::into)
859 }
860
861 async fn close_child_session(&self, session_id: &str) -> Result<()> {
862 let writer = self.runtime.writer();
863 let runtime = writer.lock().await;
864 let lifecycle = runtime.session_lifecycle_service()?;
865 lifecycle
866 .close_session(session_id)
867 .await
868 .map_err(Into::into)
869 }
870
871 async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
872 self.with_writer(async |runtime: &mut LashRuntime| {
873 runtime
874 .activate_managed_session(session_id)
875 .await
876 .map_err(Into::into)
877 })
878 .await
879 }
880
881 async fn inject_turn_input(&self, id: Option<String>, message: PluginMessage) -> Result<()> {
882 self.inject_turn_inputs(vec![lash_core::InjectedTurnInput { id, message }])
883 .await
884 }
885
886 async fn inject_turn_inputs(&self, messages: Vec<lash_core::InjectedTurnInput>) -> Result<()> {
887 for input in messages {
888 let source_key = input.id.map(|id| format!("injection:{id}"));
889 let turn_input = turn_input_from_plugin_message(input.message);
890 self.runtime
891 .enqueue_turn_input(
892 turn_input,
893 lash_core::DeliveryPolicy::EarliestSafeBoundary,
894 lash_core::SlotPolicy::Join,
895 source_key,
896 )
897 .await
898 .map(|_| ())
899 .map_err(EmbedError::Runtime)?;
900 }
901 Ok(())
902 }
903
904 async fn tool_registry(&self) -> Result<Arc<lash_core::ToolRegistry>> {
905 self.runtime
906 .writer()
907 .lock()
908 .await
909 .plugin_session()
910 .map(|session| session.tool_registry())
911 .ok_or_else(|| {
912 EmbedError::Session(SessionError::Protocol(
913 "tool registry is unavailable in this runtime session".to_string(),
914 ))
915 })
916 }
917}
918
919fn turn_input_from_plugin_message(message: PluginMessage) -> TurnInput {
920 let mut input = TurnInput::empty();
921 if !message.content.is_empty() {
922 input.items.push(InputItem::Text {
923 text: message.content,
924 });
925 }
926 for (index, bytes) in message.images.into_iter().enumerate() {
927 let id = format!("injected-image-{index}");
928 input.items.push(InputItem::ImageRef { id: id.clone() });
929 input.image_blobs.insert(id, bytes);
930 }
931 input
932}
933
934#[derive(Clone)]
935pub struct SessionConfigAdmin {
936 control: SessionAdmin,
937}
938
939impl SessionConfigAdmin {
940 pub async fn update(&self, patch: SessionConfigPatch) -> Result<()> {
941 self.control.update_config(patch).await
942 }
943
944 pub async fn update_session_config(
945 &self,
946 provider: Option<ProviderHandle>,
947 model: Option<lash_core::ModelSpec>,
948 prompt: Option<PromptLayer>,
949 ) -> Result<()> {
950 self.control
951 .update_session_config(provider, model, prompt)
952 .await
953 }
954
955 pub async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
956 self.control.set_prompt_template(template).await
957 }
958
959 pub async fn clear_prompt_template(&self) -> Result<()> {
960 self.control.clear_prompt_template().await
961 }
962
963 pub async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
964 self.control.add_prompt_contribution(contribution).await
965 }
966
967 pub async fn replace_prompt_slot(
968 &self,
969 slot: PromptSlot,
970 contributions: impl IntoIterator<Item = PromptContribution>,
971 ) -> Result<()> {
972 self.control.replace_prompt_slot(slot, contributions).await
973 }
974
975 pub async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
976 self.control.clear_prompt_slot(slot).await
977 }
978}
979
980#[derive(Clone)]
981pub struct ToolAdmin {
982 control: SessionAdmin,
983}
984
985impl ToolAdmin {
986 pub(crate) fn new(control: SessionAdmin) -> Self {
987 Self { control }
988 }
989}
990
991impl ToolAdmin {
992 pub async fn state(&self) -> Result<ToolState> {
993 self.control.tool_state().await
994 }
995
996 pub fn advanced(&self) -> AdvancedToolAdmin {
997 AdvancedToolAdmin {
998 control: self.control.clone(),
999 }
1000 }
1001
1002 pub async fn set_availability(
1003 &self,
1004 name: impl AsRef<str>,
1005 availability: ToolAvailability,
1006 ) -> Result<u64> {
1007 self.control
1008 .set_tool_availability(name.as_ref(), availability)
1009 .await
1010 }
1011
1012 pub async fn set_availability_many<N: AsRef<str>>(
1013 &self,
1014 updates: &[(N, ToolAvailability)],
1015 ) -> Result<u64> {
1016 self.control.set_tool_availability_many(updates).await
1017 }
1018
1019 pub async fn clear_availability_override(&self, name: impl AsRef<str>) -> Result<u64> {
1020 self.control
1021 .clear_tool_availability_override(name.as_ref())
1022 .await
1023 }
1024
1025 pub async fn active_manifests(&self) -> Result<Vec<ToolManifest>> {
1026 self.control.active_tool_manifests().await
1027 }
1028
1029 pub async fn add_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
1030 self.control.add_tool_provider(provider).await
1031 }
1032
1033 pub async fn remove_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
1034 self.control.remove_tool_source(handle).await
1035 }
1036}
1037
1038#[derive(Clone)]
1039pub struct AdvancedToolAdmin {
1040 control: SessionAdmin,
1041}
1042
1043impl AdvancedToolAdmin {
1044 pub async fn apply_state(&self, state: ToolState) -> Result<u64> {
1050 self.control.apply_tool_state(state).await
1051 }
1052
1053 pub async fn restore_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
1066 self.control.restore_tool_state(state).await
1067 }
1068}
1069
1070#[derive(Clone)]
1071pub struct SessionCommandAdmin {
1072 control: SessionAdmin,
1073}
1074
1075impl SessionCommandAdmin {
1076 pub async fn refresh_tool_catalog(
1081 &self,
1082 reason: impl Into<String>,
1083 idempotency_key: impl Into<String>,
1084 ) -> Result<lash_core::SessionCommandReceipt> {
1085 self.control
1086 .submit_session_command(
1087 lash_core::SessionCommand::RefreshToolCatalog {
1088 reason: reason.into(),
1089 },
1090 idempotency_key,
1091 )
1092 .await
1093 }
1094
1095 pub async fn reset(
1096 &self,
1097 reason: impl Into<String>,
1098 idempotency_key: impl Into<String>,
1099 ) -> Result<lash_core::SessionCommandReceipt> {
1100 self.control
1101 .submit_session_command(
1102 lash_core::SessionCommand::ResetSession {
1103 reason: reason.into(),
1104 },
1105 idempotency_key,
1106 )
1107 .await
1108 }
1109}
1110
1111#[derive(Clone)]
1113pub struct SessionTriggerAdmin {
1114 control: SessionAdmin,
1115}
1116
1117impl SessionTriggerAdmin {
1118 pub async fn list_all(&self) -> Result<Vec<lash_core::TriggerRegistration>> {
1124 self.control.list_lashlang_trigger_registrations().await
1125 }
1126
1127 pub async fn by_source_type(
1132 &self,
1133 source_type: impl Into<lash_core::TriggerEventType>,
1134 ) -> Result<Vec<lash_core::TriggerRegistration>> {
1135 self.control
1136 .lashlang_trigger_registrations_by_source_type(source_type)
1137 .await
1138 }
1139}
1140
1141#[derive(Clone)]
1142pub struct SessionProcessAdmin {
1143 control: SessionAdmin,
1144}
1145
1146impl SessionProcessAdmin {
1147 pub(crate) fn new(control: SessionAdmin) -> Self {
1148 Self { control }
1149 }
1150
1151 pub async fn start(
1152 &self,
1153 request: lash_core::ProcessStartRequest,
1154 scoped_effect_controller: ScopedEffectController<'_>,
1155 ) -> Result<lash_core::ProcessHandleSummary> {
1156 self.control
1157 .start_process(request, scoped_effect_controller)
1158 .await
1159 }
1160
1161 pub async fn list(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
1162 self.control.list_process_handles().await
1163 }
1164
1165 pub async fn list_all(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
1166 self.control.list_all_process_handles().await
1167 }
1168
1169 pub async fn await_all(&self) -> Result<()> {
1170 self.control.await_background_work().await
1171 }
1172
1173 pub async fn cancel(
1174 &self,
1175 process_id: &str,
1176 scoped_effect_controller: ScopedEffectController<'_>,
1177 ) -> Result<lash_core::ProcessCancelSummary> {
1178 self.control
1179 .cancel_process(process_id, scoped_effect_controller)
1180 .await
1181 }
1182
1183 pub async fn cancel_all(
1184 &self,
1185 scoped_effect_controller: ScopedEffectController<'_>,
1186 ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
1187 self.control
1188 .cancel_visible_processes(scoped_effect_controller)
1189 .await
1190 }
1191}
1192
1193#[derive(Clone)]
1194pub struct SessionStateAdmin {
1195 control: SessionAdmin,
1196}
1197
1198impl SessionStateAdmin {
1199 pub async fn export(&self) -> lash_core::SessionSnapshot {
1200 self.control.export_state().await
1201 }
1202
1203 pub async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
1204 self.control.append_messages(messages).await
1205 }
1206
1207 pub async fn append_plugin_body(
1208 &self,
1209 plugin_type: impl Into<String>,
1210 body: serde_json::Value,
1211 ) -> Result<()> {
1212 self.control.append_plugin_body(plugin_type, body).await
1213 }
1214
1215 pub async fn set_persisted(&self, state: RuntimeSessionState) -> Result<()> {
1216 self.control.set_persisted_state(state).await
1217 }
1218
1219 pub async fn branch_to_node(
1220 &self,
1221 target_leaf: Option<String>,
1222 ) -> Result<lash_core::SessionSnapshot> {
1223 self.control.branch_to_node(target_leaf).await
1224 }
1225
1226 pub async fn persist_current(&self) -> Result<RuntimeSessionState> {
1227 self.control.persist_current_state().await
1228 }
1229
1230 pub async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
1231 self.control.session_state_service().await
1232 }
1233
1234 pub async fn snapshot_execution(&self) -> Result<Option<Vec<u8>>> {
1235 self.control.snapshot_execution_state().await
1236 }
1237
1238 pub async fn restore_execution(&self, bytes: &[u8]) -> Result<()> {
1239 self.control.restore_execution_state(bytes).await
1240 }
1241
1242 pub async fn compact_context(
1243 &self,
1244 instructions: Option<String>,
1245 scoped_effect_controller: ScopedEffectController<'_>,
1246 ) -> Result<bool> {
1247 self.control
1248 .compact_context(instructions, scoped_effect_controller)
1249 .await
1250 }
1251}
1252
1253#[derive(Clone)]
1254pub struct PluginActions {
1255 pub(crate) control: SessionAdmin,
1256}
1257
1258impl PluginActions {
1259 pub async fn call<Op: lash_core::PluginAction>(&self, args: Op::Args) -> Result<Op::Output> {
1260 self.control.call_plugin_action::<Op>(args).await
1261 }
1262}
1263
1264#[derive(Clone)]
1265pub struct ChildSessionAdmin {
1266 control: SessionAdmin,
1267}
1268
1269impl ChildSessionAdmin {
1270 pub async fn create_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
1271 self.control.create_child_session(request).await
1272 }
1273
1274 pub async fn close_session(&self, session_id: &str) -> Result<()> {
1275 self.control.close_child_session(session_id).await
1276 }
1277
1278 pub async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
1279 self.control.activate_managed_session(session_id).await
1280 }
1281}
1282
1283#[derive(Clone)]
1284pub struct InjectionAdmin {
1285 control: SessionAdmin,
1286}
1287
1288impl InjectionAdmin {
1289 pub async fn inject_turn_input(
1290 &self,
1291 id: Option<String>,
1292 message: PluginMessage,
1293 ) -> Result<()> {
1294 self.control.inject_turn_input(id, message).await
1295 }
1296
1297 pub async fn inject_turn_inputs(
1298 &self,
1299 messages: Vec<lash_core::InjectedTurnInput>,
1300 ) -> Result<()> {
1301 self.control.inject_turn_inputs(messages).await
1302 }
1303}
1304
1305#[derive(Clone)]
1306pub struct ModeAdmin {
1307 control: SessionAdmin,
1308}
1309
1310impl ModeAdmin {
1311 pub async fn apply_session_extension(
1312 &self,
1313 extension: lash_core::ProtocolSessionExtensionHandle,
1314 ) -> Result<()> {
1315 self.control
1316 .apply_protocol_session_extension(extension)
1317 .await
1318 }
1319}