1pub use crate::session::SessionConfigPatch;
2use crate::support::*;
3pub use lash_core::{AcceptedInjectedTurnInput, PluginAction};
4
5#[derive(Clone)]
6pub struct Completions {
7 pub(crate) core: LashCore,
8}
9
10impl Completions {
11 pub async fn resolve(
12 &self,
13 key: lash_core::AwaitEventKey,
14 resolution: lash_core::Resolution,
15 ) -> Result<lash_core::ResolveOutcome> {
16 self.core
17 .env
18 .core
19 .control
20 .effect_host
21 .resolve_await_event(&key, resolution)
22 .await
23 .map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))
24 }
25}
26
27#[derive(Clone)]
28pub struct CoreTriggerAdmin {
29 pub(crate) core: LashCore,
30}
31
32impl CoreTriggerAdmin {
33 pub async fn emit(
34 &self,
35 request: lash_core::TriggerOccurrenceRequest,
36 scoped_effect_controller: ScopedEffectController<'_>,
37 ) -> Result<lash_core::TriggerEmitReport> {
38 let store = self.core.env.trigger_store.as_ref().ok_or_else(|| {
39 EmbedError::Plugin(lash_core::PluginError::Session(
40 "trigger store is unavailable in this runtime".to_string(),
41 ))
42 })?;
43 let drivers = self.core.work_driver.drivers().await;
44 let router = lash_core::TriggerRouter::new(
45 Arc::clone(store),
46 self.core.env.process_registry.clone(),
47 drivers.process,
48 );
49 router
50 .emit(request, scoped_effect_controller.controller())
51 .await
52 .map_err(Into::into)
53 }
54
55 pub async fn subscriptions(
56 &self,
57 filter: lash_core::TriggerSubscriptionFilter,
58 ) -> Result<Vec<lash_core::TriggerRegistration>> {
59 let store = self.core.env.trigger_store.as_ref().ok_or_else(|| {
60 EmbedError::Plugin(lash_core::PluginError::Session(
61 "trigger store is unavailable in this runtime".to_string(),
62 ))
63 })?;
64 let records = store.list_subscriptions(filter).await?;
65 Ok(records
66 .iter()
67 .map(lash_core::TriggerRegistration::from)
68 .collect())
69 }
70}
71
72#[derive(Clone)]
73pub struct Processes {
74 pub(crate) core: LashCore,
75}
76
77impl Processes {
78 fn registry(&self) -> Result<Arc<dyn lash_core::ProcessRegistry>> {
79 self.core
80 .env
81 .process_registry
82 .as_ref()
83 .cloned()
84 .ok_or_else(|| {
85 EmbedError::Plugin(lash_core::PluginError::Session(
86 "process registry is unavailable in this runtime".to_string(),
87 ))
88 })
89 }
90
91 fn make_observer(&self) -> Result<lash_core::ProcessWorkObserver> {
92 Ok(lash_core::ProcessWorkObserver::new(self.registry()?))
93 }
94
95 fn process_invocation(command: &lash_core::ProcessCommand) -> lash_core::RuntimeInvocation {
96 let effect_id = command.effect_id();
97 lash_core::RuntimeInvocation::effect(
98 lash_core::runtime::RuntimeScope::new("runtime"),
99 effect_id.clone(),
100 lash_core::RuntimeEffectKind::Process,
101 effect_id,
102 )
103 }
104
105 async fn run_command(
106 &self,
107 command: lash_core::ProcessCommand,
108 scoped_effect_controller: ScopedEffectController<'_>,
109 ) -> Result<lash_core::ProcessEffectOutcome> {
110 let registry = self.registry()?;
111 let invocation = Self::process_invocation(&command);
112 let outcome = scoped_effect_controller
113 .controller()
114 .execute_effect(
115 lash_core::RuntimeEffectEnvelope::new(
116 invocation,
117 lash_core::RuntimeEffectCommand::process(command),
118 ),
119 lash_core::RuntimeEffectLocalExecutor::processes(registry),
120 )
121 .await
122 .map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))?;
123 match outcome {
124 lash_core::RuntimeEffectOutcome::Process { result } => Ok(result),
125 _ => Err(EmbedError::Plugin(lash_core::PluginError::Session(
126 "process effect returned non-process outcome".to_string(),
127 ))),
128 }
129 }
130
131 pub async fn start(
132 &self,
133 request: lash_core::ProcessStartRequest,
134 scoped_effect_controller: ScopedEffectController<'_>,
135 ) -> Result<lash_core::ProcessRecord> {
136 let env_ref = match request.env_spec.as_ref() {
137 Some(env_spec) => Some(
138 lash_core::runtime::persist_process_execution_env(
139 self.core.env.core.durability.process_env_store.as_ref(),
140 env_spec,
141 )
142 .await?,
143 ),
144 None => None,
145 };
146 let grant = request.grant.clone();
147 let registration = request.into_registration(env_ref);
148 let command = lash_core::ProcessCommand::Start {
149 registration,
150 grant,
151 execution_context: Box::new(lash_core::ProcessExecutionContext::default()),
152 };
153 let outcome = self
154 .run_command(command, scoped_effect_controller.clone())
155 .await?;
156 let lash_core::ProcessEffectOutcome::Start { record } = outcome else {
157 return Err(EmbedError::Plugin(lash_core::PluginError::Session(
158 "process start returned the wrong outcome".to_string(),
159 )));
160 };
161 if let Some(driver) = self.core.work_driver.drivers().await.process {
162 driver.claim_and_run_pending("admin_process_start").await?;
163 }
164 Ok(record)
165 }
166
167 pub async fn list(
168 &self,
169 filter: &lash_core::ProcessListFilter,
170 ) -> Result<Vec<lash_core::ObservedProcess>> {
171 self.make_observer()?.list(filter).await.map_err(Into::into)
172 }
173
174 pub async fn get(&self, process_id: &str) -> Result<Option<lash_core::ObservedProcess>> {
175 Ok(self.make_observer()?.process(process_id).await)
176 }
177
178 pub async fn events(
179 &self,
180 process_id: &str,
181 after_sequence: u64,
182 ) -> Result<Vec<lash_core::ObservedProcessEvent>> {
183 self.make_observer()?
184 .events_after(process_id, after_sequence)
185 .await
186 .map_err(Into::into)
187 }
188
189 pub async fn await_output(&self, process_id: &str) -> Result<lash_core::ProcessAwaitOutput> {
190 self.registry()?
191 .await_process(process_id)
192 .await
193 .map_err(Into::into)
194 }
195
196 pub async fn cancel(
197 &self,
198 process_id: &str,
199 scoped_effect_controller: ScopedEffectController<'_>,
200 ) -> Result<lash_core::ProcessCancelSummary> {
201 let command = lash_core::ProcessCommand::Cancel {
202 process_id: process_id.to_string(),
203 reason: Some("requested by host".to_string()),
204 };
205 let outcome = self
206 .run_command(command, scoped_effect_controller.clone())
207 .await?;
208 let lash_core::ProcessEffectOutcome::Cancel { record } = outcome else {
209 return Err(EmbedError::Plugin(lash_core::PluginError::Session(
210 "process cancel returned the wrong outcome".to_string(),
211 )));
212 };
213 Ok(lash_core::ProcessCancelSummary::from_record(record))
214 }
215
216 pub async fn signal(
217 &self,
218 process_id: &str,
219 signal_name: impl Into<String>,
220 signal_id: impl Into<String>,
221 request: lash_core::ProcessEventAppendRequest,
222 scoped_effect_controller: ScopedEffectController<'_>,
223 ) -> Result<lash_core::ProcessEvent> {
224 let signal_name = signal_name.into();
225 let event_type = request.event_type.clone();
226 let payload = request.payload.clone();
227 let command = lash_core::ProcessCommand::Signal {
228 process_id: process_id.to_string(),
229 signal_name: signal_name.clone(),
230 signal_id: signal_id.into(),
231 request,
232 };
233 let outcome = self
234 .run_command(command, scoped_effect_controller.clone())
235 .await?;
236 let lash_core::ProcessEffectOutcome::Signal { event } = outcome else {
237 return Err(EmbedError::Plugin(lash_core::PluginError::Session(
238 "process signal returned the wrong outcome".to_string(),
239 )));
240 };
241 let registry = self.registry()?;
242 let waiting_ordinal =
243 registry
244 .get_process(process_id)
245 .await
246 .and_then(|record| match record.wait {
247 Some(lash_core::WaitState {
248 kind:
249 lash_core::WaitKind::Signal {
250 name,
251 event_type: wait_event_type,
252 ordinal,
253 ..
254 },
255 ..
256 }) if name == signal_name && wait_event_type == event_type => Some(ordinal),
257 _ => None,
258 });
259 let ordinal = match waiting_ordinal {
260 Some(ordinal) => ordinal,
261 None => {
262 registry
263 .count_events_through(process_id, &event_type, event.sequence)
264 .await?
265 }
266 };
267 if ordinal > 0 {
268 let key = scoped_effect_controller
269 .controller()
270 .await_event_key(
271 &lash_core::ExecutionScope::process(process_id),
272 lash_core::AwaitEventWaitIdentity::process_signal(
273 process_id,
274 &signal_name,
275 ordinal,
276 ),
277 )
278 .await
279 .map_err(|err| {
280 EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
281 })?;
282 let _ = scoped_effect_controller
283 .controller()
284 .resolve_await_event(&key, lash_core::Resolution::Ok(payload))
285 .await
286 .map_err(|err| {
287 EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
288 })?;
289 }
290 Ok(event)
291 }
292
293 pub async fn session_snapshot(
294 &self,
295 session_id: impl Into<String>,
296 ) -> Result<lash_core::ProcessWorkSnapshot> {
297 self.make_observer()?
298 .snapshot_for_session(session_id)
299 .await
300 .map_err(Into::into)
301 }
302
303 pub fn observer(&self) -> Result<lash_core::ProcessWorkObserver> {
304 self.make_observer()
305 }
306}
307
308#[derive(Clone)]
309pub struct SessionAdmin {
310 pub(crate) runtime: RuntimeHandle,
311}
312
313impl SessionAdmin {
314 pub fn config(&self) -> SessionConfigAdmin {
315 SessionConfigAdmin {
316 control: self.clone(),
317 }
318 }
319
320 pub fn tools(&self) -> ToolAdmin {
321 ToolAdmin {
322 control: self.clone(),
323 }
324 }
325
326 pub fn commands(&self) -> SessionCommandAdmin {
327 SessionCommandAdmin {
328 control: self.clone(),
329 }
330 }
331
332 pub fn triggers(&self) -> SessionTriggerAdmin {
333 SessionTriggerAdmin {
334 control: self.clone(),
335 }
336 }
337
338 pub fn state(&self) -> SessionStateAdmin {
339 SessionStateAdmin {
340 control: self.clone(),
341 }
342 }
343
344 pub fn children(&self) -> ChildSessionAdmin {
345 ChildSessionAdmin {
346 control: self.clone(),
347 }
348 }
349
350 pub fn injection(&self) -> InjectionAdmin {
351 InjectionAdmin {
352 control: self.clone(),
353 }
354 }
355
356 pub fn protocol(&self) -> ProtocolAdmin {
357 ProtocolAdmin {
358 control: self.clone(),
359 }
360 }
361
362 pub fn processes(&self) -> SessionProcessAdmin {
363 SessionProcessAdmin {
364 control: self.clone(),
365 }
366 }
367
368 async fn with_writer<F, T>(&self, f: F) -> T
373 where
374 F: AsyncFnOnce(&mut LashRuntime) -> T,
375 {
376 let writer = self.runtime.writer();
377 let mut runtime = writer.lock().await;
378 let value = f(&mut runtime).await;
379 self.runtime.publish_from(&runtime);
380 value
381 }
382
383 async fn update_config(&self, patch: SessionConfigPatch) -> Result<()> {
384 self.update_session_config(patch.provider, patch.model, patch.prompt)
385 .await?;
386 Ok(())
387 }
388
389 async fn update_session_config(
390 &self,
391 provider: Option<ProviderHandle>,
392 model: Option<lash_core::ModelSpec>,
393 prompt: Option<PromptLayer>,
394 ) -> Result<()> {
395 self.with_writer(async |runtime: &mut LashRuntime| {
396 runtime.update_session_config(provider, model, prompt).await;
397 })
398 .await;
399 Ok(())
400 }
401
402 async fn export_state(&self) -> lash_core::SessionSnapshot {
403 self.runtime.observe().read_view.to_snapshot()
404 }
405
406 async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
407 self.with_writer(async |runtime: &mut LashRuntime| {
408 runtime
409 .append_session_nodes(lash_core::AppendSessionNodesRequest {
410 nodes: messages
411 .into_iter()
412 .map(lash_core::SessionAppendNode::message)
413 .collect(),
414 requires_ancestor_node_id: None,
415 })
416 .await
417 .map(|_| ())
418 .map_err(Into::into)
419 })
420 .await
421 }
422
423 async fn append_plugin_body(
424 &self,
425 plugin_type: impl Into<String>,
426 body: serde_json::Value,
427 ) -> Result<()> {
428 self.with_writer(async |runtime: &mut LashRuntime| {
429 runtime
430 .append_session_nodes(lash_core::AppendSessionNodesRequest {
431 nodes: vec![lash_core::SessionAppendNode::plugin(plugin_type, body)],
432 requires_ancestor_node_id: None,
433 })
434 .await
435 .map(|_| ())
436 .map_err(Into::into)
437 })
438 .await
439 }
440
441 async fn set_persisted_state(&self, state: RuntimeSessionState) -> Result<()> {
442 self.with_writer(async |runtime: &mut LashRuntime| {
443 runtime.set_persisted_state(state).map_err(Into::into)
444 })
445 .await
446 }
447
448 async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
449 self.with_writer(async |runtime: &mut LashRuntime| {
450 runtime.set_prompt_template(template).await;
451 })
452 .await;
453 Ok(())
454 }
455
456 async fn clear_prompt_template(&self) -> Result<()> {
457 self.with_writer(async |runtime: &mut LashRuntime| {
458 runtime.clear_prompt_template().await;
459 })
460 .await;
461 Ok(())
462 }
463
464 async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
465 self.with_writer(async |runtime: &mut LashRuntime| {
466 runtime.add_prompt_contribution(contribution).await;
467 })
468 .await;
469 Ok(())
470 }
471
472 async fn replace_prompt_slot(
473 &self,
474 slot: PromptSlot,
475 contributions: impl IntoIterator<Item = PromptContribution>,
476 ) -> Result<()> {
477 self.with_writer(async |runtime: &mut LashRuntime| {
478 runtime.replace_prompt_slot(slot, contributions).await;
479 })
480 .await;
481 Ok(())
482 }
483
484 async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
485 self.with_writer(async |runtime: &mut LashRuntime| {
486 runtime.clear_prompt_slot(slot).await;
487 })
488 .await;
489 Ok(())
490 }
491
492 async fn apply_protocol_session_extension(
493 &self,
494 extension: lash_core::ProtocolSessionExtensionHandle,
495 ) -> Result<()> {
496 self.with_writer(async |runtime: &mut LashRuntime| {
497 runtime
498 .apply_protocol_session_extension(extension)
499 .await
500 .map_err(Into::into)
501 })
502 .await
503 }
504
505 async fn branch_to_node(
506 &self,
507 target_leaf: Option<String>,
508 ) -> Result<lash_core::SessionSnapshot> {
509 self.with_writer(async |runtime: &mut LashRuntime| {
510 runtime
511 .branch_to_node(target_leaf)
512 .await
513 .map_err(Into::into)
514 })
515 .await
516 }
517
518 async fn await_background_work(&self) -> Result<()> {
519 self.with_writer(async |runtime: &mut LashRuntime| {
520 runtime.await_background_work().await.map_err(Into::into)
521 })
522 .await
523 }
524
525 async fn refresh_tool_catalog(&self) -> Result<()> {
526 self.with_writer(async |runtime: &mut LashRuntime| {
527 runtime
528 .refresh_session_tool_catalog()
529 .await
530 .map_err(Into::into)
531 })
532 .await
533 }
534
535 async fn submit_session_command(
536 &self,
537 command: lash_core::SessionCommand,
538 idempotency_key: impl Into<String>,
539 ) -> Result<lash_core::SessionCommandReceipt> {
540 let idempotency_key = idempotency_key.into();
541 self.with_writer(async |runtime: &mut LashRuntime| {
542 runtime
543 .submit_session_command(command, idempotency_key)
544 .await
545 .map_err(Into::into)
546 })
547 .await
548 }
549
550 async fn list_trigger_registrations(&self) -> Result<Vec<lash_core::TriggerRegistration>> {
551 self.with_writer(async |runtime: &mut LashRuntime| {
552 runtime
553 .list_trigger_registrations()
554 .await
555 .map_err(Into::into)
556 })
557 .await
558 }
559
560 async fn trigger_registrations_by_source_type(
561 &self,
562 source_type: impl Into<lash_core::TriggerEventType>,
563 ) -> Result<Vec<lash_core::TriggerRegistration>> {
564 self.with_writer(async |runtime: &mut LashRuntime| {
565 runtime
566 .trigger_registrations_by_source_type(source_type)
567 .await
568 .map_err(Into::into)
569 })
570 .await
571 }
572
573 async fn invoke_plugin_action(
574 &self,
575 name: &str,
576 args: serde_json::Value,
577 ) -> Result<ToolResult> {
578 let session_id = self.runtime.observe().session_id().to_string();
579 let writer = self.runtime.writer();
580 writer
581 .lock()
582 .await
583 .invoke_plugin_action(name, args, Some(session_id))
584 .await
585 .map_err(Into::into)
586 }
587
588 async fn call_plugin_action<Op: lash_core::PluginAction>(
589 &self,
590 args: Op::Args,
591 ) -> Result<Op::Output> {
592 let result = self
593 .invoke_plugin_action(
594 Op::NAME,
595 serde_json::to_value(args).map_err(|err| {
596 EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
597 "invalid {} args: {err}",
598 Op::NAME
599 )))
600 })?,
601 )
602 .await?;
603 let Some(output) = result.as_done_output() else {
604 return Err(EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
605 "{} returned a pending result where completed output is required",
606 Op::NAME
607 ))));
608 };
609 if !output.is_success() {
610 return Err(EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
611 "{} failed: {}",
612 Op::NAME,
613 output.value_for_projection()
614 ))));
615 }
616 serde_json::from_value(output.value_for_projection()).map_err(|err| {
617 EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
618 "invalid {} output: {err}",
619 Op::NAME
620 )))
621 })
622 }
623
624 async fn compact_context(
625 &self,
626 instructions: Option<String>,
627 scoped_effect_controller: ScopedEffectController<'_>,
628 ) -> Result<bool> {
629 self.with_writer(async |runtime: &mut LashRuntime| {
630 runtime
631 .compact_context(instructions, scoped_effect_controller)
632 .await
633 .map_err(Into::into)
634 })
635 .await
636 }
637
638 async fn persist_current_state(&self) -> Result<RuntimeSessionState> {
639 self.with_writer(async |runtime: &mut LashRuntime| {
640 runtime.await_background_work().await?;
641 Ok(runtime.export_persisted_state())
642 })
643 .await
644 }
645
646 async fn list_process_handles(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
647 Ok(self.runtime.observe().list_process_handles().await)
648 }
649
650 async fn list_all_process_handles(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
651 Ok(self.runtime.observe().list_all_process_handles().await)
652 }
653
654 async fn start_process(
655 &self,
656 request: lash_core::ProcessStartRequest,
657 scoped_effect_controller: ScopedEffectController<'_>,
658 ) -> Result<lash_core::ProcessHandleSummary> {
659 let writer = self.runtime.writer();
660 let runtime = writer.lock().await;
661 let session_id = runtime.session_id().to_string();
662 let processes = runtime.process_service()?;
663 let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
664 let summary = processes
665 .start_from_request(&session_id, request, scope)
666 .await
667 .map_err(EmbedError::Plugin)?;
668 self.runtime.record_process_changed(
669 SessionProcessEventKind::Started,
670 vec![summary.process_id.clone()],
671 );
672 Ok(summary)
673 }
674
675 async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
676 self.runtime
677 .writer()
678 .lock()
679 .await
680 .session_state_service()
681 .map_err(Into::into)
682 }
683
684 async fn cancel_process(
685 &self,
686 process_id: &str,
687 scoped_effect_controller: ScopedEffectController<'_>,
688 ) -> Result<lash_core::ProcessCancelSummary> {
689 let writer = self.runtime.writer();
690 let runtime = writer.lock().await;
691 let session_id = runtime.session_id().to_string();
692 let processes = runtime.process_service()?;
693 let cancel_ability = runtime.process_cancel_ability();
694 let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
695 let summary = cancel_ability
696 .cancel_summary(
697 processes.as_ref(),
698 lash_core::ProcessCancelRequest::new(
699 &session_id,
700 process_id,
701 scope,
702 lash_core::ProcessCancelSource::HostApi,
703 )
704 .with_reason("requested by host API"),
705 )
706 .await
707 .map_err(EmbedError::Plugin)?;
708 self.runtime.record_process_changed(
709 SessionProcessEventKind::Cancelled,
710 vec![summary.process_id.clone()],
711 );
712 Ok(summary)
713 }
714
715 async fn cancel_visible_processes(
716 &self,
717 scoped_effect_controller: ScopedEffectController<'_>,
718 ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
719 let writer = self.runtime.writer();
720 let runtime = writer.lock().await;
721 let session_id = runtime.session_id().to_string();
722 let processes = runtime.process_service()?;
723 let cancel_ability = runtime.process_cancel_ability();
724 let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
725 let summaries = cancel_ability
726 .cancel_all_visible(
727 processes.as_ref(),
728 lash_core::ProcessCancelAllRequest::new(
729 &session_id,
730 scope,
731 lash_core::ProcessCancelSource::HostApi,
732 )
733 .with_reason("requested by host API"),
734 )
735 .await
736 .map_err(EmbedError::Plugin)?;
737 self.runtime.record_process_changed(
738 SessionProcessEventKind::Cancelled,
739 summaries
740 .iter()
741 .map(|summary| summary.process_id.clone())
742 .collect(),
743 );
744 Ok(summaries)
745 }
746
747 async fn snapshot_execution_state(&self) -> Result<Option<Vec<u8>>> {
748 self.with_writer(async |runtime: &mut LashRuntime| {
749 runtime.snapshot_execution_state().await.map_err(Into::into)
750 })
751 .await
752 }
753
754 async fn restore_execution_state(&self, bytes: &[u8]) -> Result<()> {
755 self.with_writer(async |runtime: &mut LashRuntime| {
756 runtime
757 .restore_execution_state(bytes)
758 .await
759 .map_err(Into::into)
760 })
761 .await
762 }
763
764 async fn tool_state(&self) -> Result<ToolState> {
765 self.runtime.observe().tool_state.clone().ok_or_else(|| {
766 EmbedError::Session(SessionError::Protocol(
767 "runtime session not available".to_string(),
768 ))
769 })
770 }
771
772 async fn apply_tool_state(&self, state: ToolState) -> Result<u64> {
773 self.with_writer(async |runtime: &mut LashRuntime| {
774 runtime
775 .apply_tool_state(state)
776 .await
777 .map_err(EmbedError::from)
778 })
779 .await
780 }
781
782 async fn restore_tool_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
783 self.with_writer(async |runtime: &mut LashRuntime| {
784 runtime
785 .restore_tool_state(state)
786 .await
787 .map_err(EmbedError::from)
788 })
789 .await
790 }
791
792 async fn set_tool_availability(
793 &self,
794 tool_id: lash_core::ToolId,
795 availability: ToolAvailability,
796 ) -> Result<u64> {
797 self.set_tool_availability_many(&[(tool_id, availability)])
798 .await
799 }
800
801 async fn set_tool_availability_many(
802 &self,
803 updates: &[(lash_core::ToolId, ToolAvailability)],
804 ) -> Result<u64> {
805 let mut state = self.tool_state().await?;
806 for (tool_id, availability) in updates {
807 state
808 .set_availability(tool_id, Some(*availability))
809 .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
810 }
811 self.apply_tool_state(state).await
812 }
813
814 async fn clear_tool_availability_override(&self, tool_id: lash_core::ToolId) -> Result<u64> {
815 let mut state = self.tool_state().await?;
816 state
817 .set_availability(&tool_id, None)
818 .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
819 self.apply_tool_state(state).await
820 }
821
822 async fn active_tool_manifests(&self) -> Result<Vec<ToolManifest>> {
823 Ok(self.tool_state().await?.tool_manifests())
824 }
825
826 async fn add_tool_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
827 let tool_registry = self.tool_registry().await?;
828 let handle = tool_registry
829 .add_tool_provider(provider)
830 .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
831 self.refresh_tool_catalog().await?;
832 Ok(handle)
833 }
834
835 async fn remove_tool_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
836 let tool_registry = self.tool_registry().await?;
837 let generation = tool_registry
838 .remove_source(handle)
839 .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
840 self.refresh_tool_catalog().await?;
841 Ok(generation)
842 }
843
844 async fn create_child_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
845 let writer = self.runtime.writer();
846 let runtime = writer.lock().await;
847 let lifecycle = runtime.session_lifecycle_service()?;
848 lifecycle.create_session(request).await.map_err(Into::into)
849 }
850
851 async fn close_child_session(&self, session_id: &str) -> Result<()> {
852 let writer = self.runtime.writer();
853 let runtime = writer.lock().await;
854 let lifecycle = runtime.session_lifecycle_service()?;
855 lifecycle
856 .close_session(session_id)
857 .await
858 .map_err(Into::into)
859 }
860
861 async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
862 self.with_writer(async |runtime: &mut LashRuntime| {
863 runtime
864 .activate_managed_session(session_id)
865 .await
866 .map_err(Into::into)
867 })
868 .await
869 }
870
871 async fn inject_turn_input(&self, id: Option<String>, message: PluginMessage) -> Result<()> {
872 self.inject_turn_inputs(vec![lash_core::InjectedTurnInput { id, message }])
873 .await
874 }
875
876 async fn inject_turn_inputs(&self, messages: Vec<lash_core::InjectedTurnInput>) -> Result<()> {
877 for input in messages {
878 let source_key = input.id.map(|id| format!("injection:{id}"));
879 let turn_input = turn_input_from_plugin_message(input.message);
880 self.runtime
881 .enqueue_turn_input(
882 turn_input,
883 lash_core::DeliveryPolicy::EarliestSafeBoundary,
884 lash_core::SlotPolicy::Join,
885 source_key,
886 )
887 .await
888 .map(|_| ())
889 .map_err(EmbedError::Runtime)?;
890 }
891 Ok(())
892 }
893
894 async fn tool_registry(&self) -> Result<Arc<lash_core::ToolRegistry>> {
895 self.runtime
896 .writer()
897 .lock()
898 .await
899 .plugin_session()
900 .map(|session| session.tool_registry())
901 .ok_or_else(|| {
902 EmbedError::Session(SessionError::Protocol(
903 "tool registry is unavailable in this runtime session".to_string(),
904 ))
905 })
906 }
907}
908
909fn turn_input_from_plugin_message(message: PluginMessage) -> TurnInput {
910 let mut input = TurnInput::empty();
911 if !message.content.is_empty() {
912 input.items.push(InputItem::Text {
913 text: message.content,
914 });
915 }
916 for (index, bytes) in message.images.into_iter().enumerate() {
917 let id = format!("injected-image-{index}");
918 input.items.push(InputItem::ImageRef { id: id.clone() });
919 input.image_blobs.insert(id, bytes);
920 }
921 input
922}
923
924#[derive(Clone)]
925pub struct SessionConfigAdmin {
926 control: SessionAdmin,
927}
928
929impl SessionConfigAdmin {
930 pub async fn update(&self, patch: SessionConfigPatch) -> Result<()> {
931 self.control.update_config(patch).await
932 }
933
934 pub async fn update_session_config(
935 &self,
936 provider: Option<ProviderHandle>,
937 model: Option<lash_core::ModelSpec>,
938 prompt: Option<PromptLayer>,
939 ) -> Result<()> {
940 self.control
941 .update_session_config(provider, model, prompt)
942 .await
943 }
944
945 pub async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
946 self.control.set_prompt_template(template).await
947 }
948
949 pub async fn clear_prompt_template(&self) -> Result<()> {
950 self.control.clear_prompt_template().await
951 }
952
953 pub async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
954 self.control.add_prompt_contribution(contribution).await
955 }
956
957 pub async fn replace_prompt_slot(
958 &self,
959 slot: PromptSlot,
960 contributions: impl IntoIterator<Item = PromptContribution>,
961 ) -> Result<()> {
962 self.control.replace_prompt_slot(slot, contributions).await
963 }
964
965 pub async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
966 self.control.clear_prompt_slot(slot).await
967 }
968}
969
970#[derive(Clone)]
971pub struct ToolAdmin {
972 control: SessionAdmin,
973}
974
975impl ToolAdmin {
976 pub(crate) fn new(control: SessionAdmin) -> Self {
977 Self { control }
978 }
979}
980
981impl ToolAdmin {
982 pub async fn state(&self) -> Result<ToolState> {
983 self.control.tool_state().await
984 }
985
986 pub fn advanced(&self) -> AdvancedToolAdmin {
987 AdvancedToolAdmin {
988 control: self.control.clone(),
989 }
990 }
991
992 pub async fn set_availability(
993 &self,
994 tool_id: impl Into<lash_core::ToolId>,
995 availability: ToolAvailability,
996 ) -> Result<u64> {
997 self.control
998 .set_tool_availability(tool_id.into(), availability)
999 .await
1000 }
1001
1002 pub async fn set_availability_many(
1003 &self,
1004 updates: &[(lash_core::ToolId, ToolAvailability)],
1005 ) -> Result<u64> {
1006 self.control.set_tool_availability_many(updates).await
1007 }
1008
1009 pub async fn clear_availability_override(
1010 &self,
1011 tool_id: impl Into<lash_core::ToolId>,
1012 ) -> Result<u64> {
1013 self.control
1014 .clear_tool_availability_override(tool_id.into())
1015 .await
1016 }
1017
1018 pub async fn active_manifests(&self) -> Result<Vec<ToolManifest>> {
1019 self.control.active_tool_manifests().await
1020 }
1021
1022 pub async fn add_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
1023 self.control.add_tool_provider(provider).await
1024 }
1025
1026 pub async fn remove_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
1027 self.control.remove_tool_source(handle).await
1028 }
1029}
1030
1031#[derive(Clone)]
1032pub struct AdvancedToolAdmin {
1033 control: SessionAdmin,
1034}
1035
1036impl AdvancedToolAdmin {
1037 pub async fn apply_state(&self, state: ToolState) -> Result<u64> {
1043 self.control.apply_tool_state(state).await
1044 }
1045
1046 pub async fn restore_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
1059 self.control.restore_tool_state(state).await
1060 }
1061}
1062
1063#[derive(Clone)]
1064pub struct SessionCommandAdmin {
1065 control: SessionAdmin,
1066}
1067
1068impl SessionCommandAdmin {
1069 pub async fn refresh_tool_catalog(
1074 &self,
1075 reason: impl Into<String>,
1076 idempotency_key: impl Into<String>,
1077 ) -> Result<lash_core::SessionCommandReceipt> {
1078 self.control
1079 .submit_session_command(
1080 lash_core::SessionCommand::RefreshToolCatalog {
1081 reason: reason.into(),
1082 },
1083 idempotency_key,
1084 )
1085 .await
1086 }
1087
1088 pub async fn reset(
1089 &self,
1090 reason: impl Into<String>,
1091 idempotency_key: impl Into<String>,
1092 ) -> Result<lash_core::SessionCommandReceipt> {
1093 self.control
1094 .submit_session_command(
1095 lash_core::SessionCommand::ResetSession {
1096 reason: reason.into(),
1097 },
1098 idempotency_key,
1099 )
1100 .await
1101 }
1102}
1103
1104#[derive(Clone)]
1106pub struct SessionTriggerAdmin {
1107 control: SessionAdmin,
1108}
1109
1110impl SessionTriggerAdmin {
1111 pub async fn list_all(&self) -> Result<Vec<lash_core::TriggerRegistration>> {
1117 self.control.list_trigger_registrations().await
1118 }
1119
1120 pub async fn by_source_type(
1125 &self,
1126 source_type: impl Into<lash_core::TriggerEventType>,
1127 ) -> Result<Vec<lash_core::TriggerRegistration>> {
1128 self.control
1129 .trigger_registrations_by_source_type(source_type)
1130 .await
1131 }
1132}
1133
1134#[derive(Clone)]
1135pub struct SessionProcessAdmin {
1136 control: SessionAdmin,
1137}
1138
1139impl SessionProcessAdmin {
1140 pub(crate) fn new(control: SessionAdmin) -> Self {
1141 Self { control }
1142 }
1143
1144 pub async fn start(
1145 &self,
1146 request: lash_core::ProcessStartRequest,
1147 scoped_effect_controller: ScopedEffectController<'_>,
1148 ) -> Result<lash_core::ProcessHandleSummary> {
1149 self.control
1150 .start_process(request, scoped_effect_controller)
1151 .await
1152 }
1153
1154 pub async fn list(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
1155 self.control.list_process_handles().await
1156 }
1157
1158 pub async fn list_all(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
1159 self.control.list_all_process_handles().await
1160 }
1161
1162 pub async fn await_all(&self) -> Result<()> {
1163 self.control.await_background_work().await
1164 }
1165
1166 pub async fn cancel(
1167 &self,
1168 process_id: &str,
1169 scoped_effect_controller: ScopedEffectController<'_>,
1170 ) -> Result<lash_core::ProcessCancelSummary> {
1171 self.control
1172 .cancel_process(process_id, scoped_effect_controller)
1173 .await
1174 }
1175
1176 pub async fn cancel_all(
1177 &self,
1178 scoped_effect_controller: ScopedEffectController<'_>,
1179 ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
1180 self.control
1181 .cancel_visible_processes(scoped_effect_controller)
1182 .await
1183 }
1184}
1185
1186#[derive(Clone)]
1187pub struct SessionStateAdmin {
1188 control: SessionAdmin,
1189}
1190
1191impl SessionStateAdmin {
1192 pub async fn export(&self) -> lash_core::SessionSnapshot {
1193 self.control.export_state().await
1194 }
1195
1196 pub async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
1197 self.control.append_messages(messages).await
1198 }
1199
1200 pub async fn append_plugin_body(
1201 &self,
1202 plugin_type: impl Into<String>,
1203 body: serde_json::Value,
1204 ) -> Result<()> {
1205 self.control.append_plugin_body(plugin_type, body).await
1206 }
1207
1208 pub async fn set_persisted(&self, state: RuntimeSessionState) -> Result<()> {
1209 self.control.set_persisted_state(state).await
1210 }
1211
1212 pub async fn branch_to_node(
1213 &self,
1214 target_leaf: Option<String>,
1215 ) -> Result<lash_core::SessionSnapshot> {
1216 self.control.branch_to_node(target_leaf).await
1217 }
1218
1219 pub async fn persist_current(&self) -> Result<RuntimeSessionState> {
1220 self.control.persist_current_state().await
1221 }
1222
1223 pub async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
1224 self.control.session_state_service().await
1225 }
1226
1227 pub async fn snapshot_execution(&self) -> Result<Option<Vec<u8>>> {
1228 self.control.snapshot_execution_state().await
1229 }
1230
1231 pub async fn restore_execution(&self, bytes: &[u8]) -> Result<()> {
1232 self.control.restore_execution_state(bytes).await
1233 }
1234
1235 pub async fn compact_context(
1236 &self,
1237 instructions: Option<String>,
1238 scoped_effect_controller: ScopedEffectController<'_>,
1239 ) -> Result<bool> {
1240 self.control
1241 .compact_context(instructions, scoped_effect_controller)
1242 .await
1243 }
1244}
1245
1246#[derive(Clone)]
1247pub struct PluginActions {
1248 pub(crate) control: SessionAdmin,
1249}
1250
1251impl PluginActions {
1252 pub async fn call<Op: lash_core::PluginAction>(&self, args: Op::Args) -> Result<Op::Output> {
1253 self.control.call_plugin_action::<Op>(args).await
1254 }
1255}
1256
1257#[derive(Clone)]
1258pub struct ChildSessionAdmin {
1259 control: SessionAdmin,
1260}
1261
1262impl ChildSessionAdmin {
1263 pub async fn create_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
1264 self.control.create_child_session(request).await
1265 }
1266
1267 pub async fn close_session(&self, session_id: &str) -> Result<()> {
1268 self.control.close_child_session(session_id).await
1269 }
1270
1271 pub async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
1272 self.control.activate_managed_session(session_id).await
1273 }
1274}
1275
1276#[derive(Clone)]
1277pub struct InjectionAdmin {
1278 control: SessionAdmin,
1279}
1280
1281impl InjectionAdmin {
1282 pub async fn inject_turn_input(
1283 &self,
1284 id: Option<String>,
1285 message: PluginMessage,
1286 ) -> Result<()> {
1287 self.control.inject_turn_input(id, message).await
1288 }
1289
1290 pub async fn inject_turn_inputs(
1291 &self,
1292 messages: Vec<lash_core::InjectedTurnInput>,
1293 ) -> Result<()> {
1294 self.control.inject_turn_inputs(messages).await
1295 }
1296}
1297
1298#[derive(Clone)]
1299pub struct ProtocolAdmin {
1300 control: SessionAdmin,
1301}
1302
1303impl ProtocolAdmin {
1304 pub async fn apply_session_extension(
1305 &self,
1306 extension: lash_core::ProtocolSessionExtensionHandle,
1307 ) -> Result<()> {
1308 self.control
1309 .apply_protocol_session_extension(extension)
1310 .await
1311 }
1312}