1pub use crate::session::SessionConfigPatch;
2use crate::support::*;
3pub use lash_core::{AcceptedInjectedTurnInput, PluginAction};
4
5#[derive(Clone)]
6pub struct Completions {
7 pub(crate) core: LashCore,
8}
9
10impl Completions {
11 pub async fn resolve(
12 &self,
13 key: lash_core::AwaitEventKey,
14 resolution: lash_core::Resolution,
15 ) -> Result<lash_core::ResolveOutcome> {
16 self.core
17 .env
18 .core
19 .control
20 .effect_host
21 .resolve_await_event(&key, resolution)
22 .await
23 .map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))
24 }
25}
26
27#[derive(Clone)]
28pub struct CoreTriggerAdmin {
29 pub(crate) core: LashCore,
30}
31
32impl CoreTriggerAdmin {
33 pub async fn emit(
34 &self,
35 request: lash_core::TriggerOccurrenceRequest,
36 scoped_effect_controller: ScopedEffectController<'_>,
37 ) -> Result<lash_core::TriggerEmitReport> {
38 let store = self.core.env.trigger_store.as_ref().ok_or_else(|| {
39 EmbedError::Plugin(lash_core::PluginError::Session(
40 "trigger store is unavailable in this runtime".to_string(),
41 ))
42 })?;
43 let process_work_poke = self.core.process_work_runner.poke().await;
44 let router = lash_core::TriggerRouter::new(
45 Arc::clone(store),
46 self.core.env.process_registry.clone(),
47 process_work_poke,
48 );
49 router
50 .emit(request, scoped_effect_controller.controller())
51 .await
52 .map_err(Into::into)
53 }
54
55 pub async fn subscriptions(
56 &self,
57 filter: lash_core::TriggerSubscriptionFilter,
58 ) -> Result<Vec<lash_core::TriggerRegistration>> {
59 let store = self.core.env.trigger_store.as_ref().ok_or_else(|| {
60 EmbedError::Plugin(lash_core::PluginError::Session(
61 "trigger store is unavailable in this runtime".to_string(),
62 ))
63 })?;
64 let records = store.list_subscriptions(filter).await?;
65 Ok(records
66 .iter()
67 .map(lash_core::TriggerRegistration::from)
68 .collect())
69 }
70}
71
72#[derive(Clone)]
73pub struct Processes {
74 pub(crate) core: LashCore,
75}
76
77impl Processes {
78 fn registry(&self) -> Result<Arc<dyn lash_core::ProcessRegistry>> {
79 self.core
80 .env
81 .process_registry
82 .as_ref()
83 .cloned()
84 .ok_or_else(|| {
85 EmbedError::Plugin(lash_core::PluginError::Session(
86 "process registry is unavailable in this runtime".to_string(),
87 ))
88 })
89 }
90
91 fn make_observer(&self) -> Result<lash_core::ProcessWorkObserver> {
92 Ok(lash_core::ProcessWorkObserver::new(self.registry()?))
93 }
94
95 fn process_invocation(command: &lash_core::ProcessCommand) -> lash_core::RuntimeInvocation {
96 let effect_id = command.effect_id();
97 lash_core::RuntimeInvocation::effect(
98 lash_core::runtime::RuntimeScope::new("runtime"),
99 effect_id.clone(),
100 lash_core::RuntimeEffectKind::Process,
101 effect_id,
102 )
103 }
104
105 async fn run_command(
106 &self,
107 command: lash_core::ProcessCommand,
108 scoped_effect_controller: ScopedEffectController<'_>,
109 ) -> Result<lash_core::ProcessEffectOutcome> {
110 let registry = self.registry()?;
111 let invocation = Self::process_invocation(&command);
112 let outcome = scoped_effect_controller
113 .controller()
114 .execute_effect(
115 lash_core::RuntimeEffectEnvelope::new(
116 invocation,
117 lash_core::RuntimeEffectCommand::process(command),
118 ),
119 lash_core::RuntimeEffectLocalExecutor::processes(registry),
120 )
121 .await
122 .map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))?;
123 match outcome {
124 lash_core::RuntimeEffectOutcome::Process { result } => Ok(result),
125 _ => Err(EmbedError::Plugin(lash_core::PluginError::Session(
126 "process effect returned non-process outcome".to_string(),
127 ))),
128 }
129 }
130
131 pub async fn start(
132 &self,
133 request: lash_core::ProcessStartRequest,
134 scoped_effect_controller: ScopedEffectController<'_>,
135 ) -> Result<lash_core::ProcessRecord> {
136 let env_ref = match request.env_spec.as_ref() {
137 Some(env_spec) => Some(
138 lash_core::runtime::persist_process_execution_env(
139 self.core.env.core.durability.process_env_store.as_ref(),
140 env_spec,
141 )
142 .await?,
143 ),
144 None => None,
145 };
146 let grant = request.grant.clone();
147 let registration = request.into_registration(env_ref);
148 let command = lash_core::ProcessCommand::Start {
149 registration,
150 grant,
151 execution_context: Box::new(lash_core::ProcessExecutionContext::default()),
152 };
153 let outcome = self
154 .run_command(command, scoped_effect_controller.clone())
155 .await?;
156 let lash_core::ProcessEffectOutcome::Start { record } = outcome else {
157 return Err(EmbedError::Plugin(lash_core::PluginError::Session(
158 "process start returned the wrong outcome".to_string(),
159 )));
160 };
161 if let Some(poke) = self.core.process_work_runner.poke().await {
162 poke.poke();
163 }
164 Ok(record)
165 }
166
167 pub async fn list(
168 &self,
169 filter: &lash_core::ProcessListFilter,
170 ) -> Result<Vec<lash_core::ObservedProcess>> {
171 self.make_observer()?.list(filter).await.map_err(Into::into)
172 }
173
174 pub async fn get(&self, process_id: &str) -> Result<Option<lash_core::ObservedProcess>> {
175 Ok(self.make_observer()?.process(process_id).await)
176 }
177
178 pub async fn events(
179 &self,
180 process_id: &str,
181 after_sequence: u64,
182 ) -> Result<Vec<lash_core::ObservedProcessEvent>> {
183 self.make_observer()?
184 .events_after(process_id, after_sequence)
185 .await
186 .map_err(Into::into)
187 }
188
189 pub async fn await_output(&self, process_id: &str) -> Result<lash_core::ProcessAwaitOutput> {
190 self.registry()?
191 .await_process(process_id)
192 .await
193 .map_err(Into::into)
194 }
195
196 pub async fn cancel(
197 &self,
198 process_id: &str,
199 scoped_effect_controller: ScopedEffectController<'_>,
200 ) -> Result<lash_core::ProcessCancelSummary> {
201 let command = lash_core::ProcessCommand::Cancel {
202 process_id: process_id.to_string(),
203 reason: Some("requested by host".to_string()),
204 };
205 let outcome = self
206 .run_command(command, scoped_effect_controller.clone())
207 .await?;
208 let lash_core::ProcessEffectOutcome::Cancel { record } = outcome else {
209 return Err(EmbedError::Plugin(lash_core::PluginError::Session(
210 "process cancel returned the wrong outcome".to_string(),
211 )));
212 };
213 Ok(lash_core::ProcessCancelSummary::from_record(record))
214 }
215
216 pub async fn signal(
217 &self,
218 process_id: &str,
219 signal_name: impl Into<String>,
220 signal_id: impl Into<String>,
221 request: lash_core::ProcessEventAppendRequest,
222 scoped_effect_controller: ScopedEffectController<'_>,
223 ) -> Result<lash_core::ProcessEvent> {
224 let signal_name = signal_name.into();
225 let event_type = request.event_type.clone();
226 let payload = request.payload.clone();
227 let command = lash_core::ProcessCommand::Signal {
228 process_id: process_id.to_string(),
229 signal_name: signal_name.clone(),
230 signal_id: signal_id.into(),
231 request,
232 };
233 let outcome = self
234 .run_command(command, scoped_effect_controller.clone())
235 .await?;
236 let lash_core::ProcessEffectOutcome::Signal { event } = outcome else {
237 return Err(EmbedError::Plugin(lash_core::PluginError::Session(
238 "process signal returned the wrong outcome".to_string(),
239 )));
240 };
241 let registry = self.registry()?;
242 let waiting_ordinal =
243 registry
244 .get_process(process_id)
245 .await
246 .and_then(|record| match record.wait {
247 Some(lash_core::WaitState {
248 kind:
249 lash_core::WaitKind::Signal {
250 name,
251 event_type: wait_event_type,
252 ordinal,
253 ..
254 },
255 ..
256 }) if name == signal_name && wait_event_type == event_type => Some(ordinal),
257 _ => None,
258 });
259 let ordinal = match waiting_ordinal {
260 Some(ordinal) => ordinal,
261 None => {
262 registry
263 .count_events_through(process_id, &event_type, event.sequence)
264 .await?
265 }
266 };
267 if ordinal > 0 {
268 let key = scoped_effect_controller
269 .controller()
270 .await_event_key(
271 &lash_core::ExecutionScope::process(process_id),
272 lash_core::AwaitEventWaitIdentity::process_signal(
273 process_id,
274 &signal_name,
275 ordinal,
276 ),
277 )
278 .await
279 .map_err(|err| {
280 EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
281 })?;
282 let _ = scoped_effect_controller
283 .controller()
284 .resolve_await_event(&key, lash_core::Resolution::Ok(payload))
285 .await
286 .map_err(|err| {
287 EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
288 })?;
289 }
290 Ok(event)
291 }
292
293 pub async fn session_snapshot(
294 &self,
295 session_id: impl Into<String>,
296 ) -> Result<lash_core::ProcessWorkSnapshot> {
297 self.make_observer()?
298 .snapshot_for_session(session_id)
299 .await
300 .map_err(Into::into)
301 }
302
303 pub fn observer(&self) -> Result<lash_core::ProcessWorkObserver> {
304 self.make_observer()
305 }
306}
307
308#[derive(Clone)]
309pub struct SessionAdmin {
310 pub(crate) runtime: RuntimeHandle,
311}
312
313impl SessionAdmin {
314 pub fn config(&self) -> SessionConfigAdmin {
315 SessionConfigAdmin {
316 control: self.clone(),
317 }
318 }
319
320 pub fn tools(&self) -> ToolAdmin {
321 ToolAdmin {
322 control: self.clone(),
323 }
324 }
325
326 pub fn commands(&self) -> SessionCommandAdmin {
327 SessionCommandAdmin {
328 control: self.clone(),
329 }
330 }
331
332 pub fn triggers(&self) -> SessionTriggerAdmin {
333 SessionTriggerAdmin {
334 control: self.clone(),
335 }
336 }
337
338 pub fn state(&self) -> SessionStateAdmin {
339 SessionStateAdmin {
340 control: self.clone(),
341 }
342 }
343
344 pub fn children(&self) -> ChildSessionAdmin {
345 ChildSessionAdmin {
346 control: self.clone(),
347 }
348 }
349
350 pub fn injection(&self) -> InjectionAdmin {
351 InjectionAdmin {
352 control: self.clone(),
353 }
354 }
355
356 pub fn protocol(&self) -> ProtocolAdmin {
357 ProtocolAdmin {
358 control: self.clone(),
359 }
360 }
361
362 pub fn processes(&self) -> SessionProcessAdmin {
363 SessionProcessAdmin {
364 control: self.clone(),
365 }
366 }
367
368 async fn with_writer<F, T>(&self, f: F) -> T
373 where
374 F: AsyncFnOnce(&mut LashRuntime) -> T,
375 {
376 let writer = self.runtime.writer();
377 let mut runtime = writer.lock().await;
378 let value = f(&mut runtime).await;
379 self.runtime.publish_from(&runtime);
380 value
381 }
382
383 async fn update_config(&self, patch: SessionConfigPatch) -> Result<()> {
384 self.update_session_config(patch.provider, patch.model, patch.prompt)
385 .await?;
386 Ok(())
387 }
388
389 async fn update_session_config(
390 &self,
391 provider: Option<ProviderHandle>,
392 model: Option<lash_core::ModelSpec>,
393 prompt: Option<PromptLayer>,
394 ) -> Result<()> {
395 self.with_writer(async |runtime: &mut LashRuntime| {
396 runtime.update_session_config(provider, model, prompt).await;
397 })
398 .await;
399 Ok(())
400 }
401
402 async fn export_state(&self) -> lash_core::SessionSnapshot {
403 self.runtime.observe().read_view.to_snapshot()
404 }
405
406 async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
407 self.with_writer(async |runtime: &mut LashRuntime| {
408 runtime
409 .append_session_nodes(lash_core::AppendSessionNodesRequest {
410 nodes: messages
411 .into_iter()
412 .map(lash_core::SessionAppendNode::message)
413 .collect(),
414 requires_ancestor_node_id: None,
415 })
416 .await
417 .map(|_| ())
418 .map_err(Into::into)
419 })
420 .await
421 }
422
423 async fn append_plugin_body(
424 &self,
425 plugin_type: impl Into<String>,
426 body: serde_json::Value,
427 ) -> Result<()> {
428 self.with_writer(async |runtime: &mut LashRuntime| {
429 runtime
430 .append_session_nodes(lash_core::AppendSessionNodesRequest {
431 nodes: vec![lash_core::SessionAppendNode::plugin(plugin_type, body)],
432 requires_ancestor_node_id: None,
433 })
434 .await
435 .map(|_| ())
436 .map_err(Into::into)
437 })
438 .await
439 }
440
441 async fn set_persisted_state(&self, state: RuntimeSessionState) -> Result<()> {
442 self.with_writer(async |runtime: &mut LashRuntime| {
443 runtime.set_persisted_state(state).map_err(Into::into)
444 })
445 .await
446 }
447
448 async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
449 self.with_writer(async |runtime: &mut LashRuntime| {
450 runtime.set_prompt_template(template).await;
451 })
452 .await;
453 Ok(())
454 }
455
456 async fn clear_prompt_template(&self) -> Result<()> {
457 self.with_writer(async |runtime: &mut LashRuntime| {
458 runtime.clear_prompt_template().await;
459 })
460 .await;
461 Ok(())
462 }
463
464 async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
465 self.with_writer(async |runtime: &mut LashRuntime| {
466 runtime.add_prompt_contribution(contribution).await;
467 })
468 .await;
469 Ok(())
470 }
471
472 async fn replace_prompt_slot(
473 &self,
474 slot: PromptSlot,
475 contributions: impl IntoIterator<Item = PromptContribution>,
476 ) -> Result<()> {
477 self.with_writer(async |runtime: &mut LashRuntime| {
478 runtime.replace_prompt_slot(slot, contributions).await;
479 })
480 .await;
481 Ok(())
482 }
483
484 async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
485 self.with_writer(async |runtime: &mut LashRuntime| {
486 runtime.clear_prompt_slot(slot).await;
487 })
488 .await;
489 Ok(())
490 }
491
492 async fn apply_protocol_session_extension(
493 &self,
494 extension: lash_core::ProtocolSessionExtensionHandle,
495 ) -> Result<()> {
496 self.with_writer(async |runtime: &mut LashRuntime| {
497 runtime
498 .apply_protocol_session_extension(extension)
499 .await
500 .map_err(Into::into)
501 })
502 .await
503 }
504
505 async fn branch_to_node(
506 &self,
507 target_leaf: Option<String>,
508 ) -> Result<lash_core::SessionSnapshot> {
509 self.with_writer(async |runtime: &mut LashRuntime| {
510 runtime
511 .branch_to_node(target_leaf)
512 .await
513 .map_err(Into::into)
514 })
515 .await
516 }
517
518 async fn await_background_work(&self) -> Result<()> {
519 self.with_writer(async |runtime: &mut LashRuntime| {
520 runtime.await_background_work().await.map_err(Into::into)
521 })
522 .await
523 }
524
525 async fn refresh_tool_catalog(&self) -> Result<()> {
526 self.with_writer(async |runtime: &mut LashRuntime| {
527 runtime
528 .refresh_session_tool_catalog()
529 .await
530 .map_err(Into::into)
531 })
532 .await
533 }
534
535 async fn submit_session_command(
536 &self,
537 command: lash_core::SessionCommand,
538 idempotency_key: impl Into<String>,
539 ) -> Result<lash_core::SessionCommandReceipt> {
540 let idempotency_key = idempotency_key.into();
541 self.with_writer(async |runtime: &mut LashRuntime| {
542 runtime
543 .submit_session_command(command, idempotency_key)
544 .await
545 .map_err(Into::into)
546 })
547 .await
548 }
549
550 async fn list_trigger_registrations(&self) -> Result<Vec<lash_core::TriggerRegistration>> {
551 self.with_writer(async |runtime: &mut LashRuntime| {
552 runtime
553 .list_trigger_registrations()
554 .await
555 .map_err(Into::into)
556 })
557 .await
558 }
559
560 async fn trigger_registrations_by_source_type(
561 &self,
562 source_type: impl Into<lash_core::TriggerEventType>,
563 ) -> Result<Vec<lash_core::TriggerRegistration>> {
564 self.with_writer(async |runtime: &mut LashRuntime| {
565 runtime
566 .trigger_registrations_by_source_type(source_type)
567 .await
568 .map_err(Into::into)
569 })
570 .await
571 }
572
573 async fn invoke_plugin_action(
574 &self,
575 name: &str,
576 args: serde_json::Value,
577 ) -> Result<ToolResult> {
578 let session_id = self.runtime.observe().session_id().to_string();
579 let writer = self.runtime.writer();
580 writer
581 .lock()
582 .await
583 .invoke_plugin_action(name, args, Some(session_id))
584 .await
585 .map_err(Into::into)
586 }
587
588 async fn call_plugin_action<Op: lash_core::PluginAction>(
589 &self,
590 args: Op::Args,
591 ) -> Result<Op::Output> {
592 let result = self
593 .invoke_plugin_action(
594 Op::NAME,
595 serde_json::to_value(args).map_err(|err| {
596 EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
597 "invalid {} args: {err}",
598 Op::NAME
599 )))
600 })?,
601 )
602 .await?;
603 let Some(output) = result.as_done_output() else {
604 return Err(EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
605 "{} returned a pending result where completed output is required",
606 Op::NAME
607 ))));
608 };
609 if !output.is_success() {
610 return Err(EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
611 "{} failed: {}",
612 Op::NAME,
613 output.value_for_projection()
614 ))));
615 }
616 serde_json::from_value(output.value_for_projection()).map_err(|err| {
617 EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
618 "invalid {} output: {err}",
619 Op::NAME
620 )))
621 })
622 }
623
624 async fn compact_context(
625 &self,
626 instructions: Option<String>,
627 scoped_effect_controller: ScopedEffectController<'_>,
628 ) -> Result<bool> {
629 self.with_writer(async |runtime: &mut LashRuntime| {
630 runtime
631 .compact_context(instructions, scoped_effect_controller)
632 .await
633 .map_err(Into::into)
634 })
635 .await
636 }
637
638 async fn persist_current_state(&self) -> Result<RuntimeSessionState> {
639 self.with_writer(async |runtime: &mut LashRuntime| {
640 runtime.await_background_work().await?;
641 Ok(runtime.export_persisted_state())
642 })
643 .await
644 }
645
646 async fn list_process_handles(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
647 Ok(self.runtime.observe().list_process_handles().await)
648 }
649
650 async fn list_all_process_handles(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
651 Ok(self.runtime.observe().list_all_process_handles().await)
652 }
653
654 async fn start_process(
655 &self,
656 request: lash_core::ProcessStartRequest,
657 scoped_effect_controller: ScopedEffectController<'_>,
658 ) -> Result<lash_core::ProcessHandleSummary> {
659 let writer = self.runtime.writer();
660 let runtime = writer.lock().await;
661 let session_id = runtime.session_id().to_string();
662 let processes = runtime.process_service()?;
663 let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
664 let summary = processes
665 .start_from_request(&session_id, request, scope)
666 .await
667 .map_err(EmbedError::Plugin)?;
668 self.runtime.record_process_changed(
669 SessionProcessEventKind::Started,
670 vec![summary.process_id.clone()],
671 );
672 Ok(summary)
673 }
674
675 async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
676 self.runtime
677 .writer()
678 .lock()
679 .await
680 .session_state_service()
681 .map_err(Into::into)
682 }
683
684 async fn cancel_process(
685 &self,
686 process_id: &str,
687 scoped_effect_controller: ScopedEffectController<'_>,
688 ) -> Result<lash_core::ProcessCancelSummary> {
689 let writer = self.runtime.writer();
690 let runtime = writer.lock().await;
691 let session_id = runtime.session_id().to_string();
692 let processes = runtime.process_service()?;
693 let cancel_ability = runtime.process_cancel_ability();
694 let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
695 let summary = cancel_ability
696 .cancel_summary(
697 processes.as_ref(),
698 lash_core::ProcessCancelRequest::new(
699 &session_id,
700 process_id,
701 scope,
702 lash_core::ProcessCancelSource::HostApi,
703 )
704 .with_reason("requested by host API"),
705 )
706 .await
707 .map_err(EmbedError::Plugin)?;
708 self.runtime.record_process_changed(
709 SessionProcessEventKind::Cancelled,
710 vec![summary.process_id.clone()],
711 );
712 Ok(summary)
713 }
714
715 async fn cancel_visible_processes(
716 &self,
717 scoped_effect_controller: ScopedEffectController<'_>,
718 ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
719 let writer = self.runtime.writer();
720 let runtime = writer.lock().await;
721 let session_id = runtime.session_id().to_string();
722 let processes = runtime.process_service()?;
723 let cancel_ability = runtime.process_cancel_ability();
724 let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
725 let summaries = cancel_ability
726 .cancel_all_visible(
727 processes.as_ref(),
728 lash_core::ProcessCancelAllRequest::new(
729 &session_id,
730 scope,
731 lash_core::ProcessCancelSource::HostApi,
732 )
733 .with_reason("requested by host API"),
734 )
735 .await
736 .map_err(EmbedError::Plugin)?;
737 self.runtime.record_process_changed(
738 SessionProcessEventKind::Cancelled,
739 summaries
740 .iter()
741 .map(|summary| summary.process_id.clone())
742 .collect(),
743 );
744 Ok(summaries)
745 }
746
747 async fn snapshot_execution_state(&self) -> Result<Option<Vec<u8>>> {
748 self.with_writer(async |runtime: &mut LashRuntime| {
749 runtime.snapshot_execution_state().await.map_err(Into::into)
750 })
751 .await
752 }
753
754 async fn restore_execution_state(&self, bytes: &[u8]) -> Result<()> {
755 self.with_writer(async |runtime: &mut LashRuntime| {
756 runtime
757 .restore_execution_state(bytes)
758 .await
759 .map_err(Into::into)
760 })
761 .await
762 }
763
764 async fn tool_state(&self) -> Result<ToolState> {
765 self.runtime.observe().tool_state.clone().ok_or_else(|| {
766 EmbedError::Session(SessionError::Protocol(
767 "runtime session not available".to_string(),
768 ))
769 })
770 }
771
772 async fn apply_tool_state(&self, state: ToolState) -> Result<u64> {
773 self.with_writer(async |runtime: &mut LashRuntime| {
774 runtime
775 .apply_tool_state(state)
776 .await
777 .map_err(EmbedError::from)
778 })
779 .await
780 }
781
782 async fn restore_tool_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
783 self.with_writer(async |runtime: &mut LashRuntime| {
784 runtime
785 .restore_tool_state(state)
786 .await
787 .map_err(EmbedError::from)
788 })
789 .await
790 }
791
792 async fn set_tool_availability(
793 &self,
794 name: &str,
795 availability: ToolAvailability,
796 ) -> Result<u64> {
797 self.set_tool_availability_many(&[(name, availability)])
798 .await
799 }
800
801 async fn set_tool_availability_many<N: AsRef<str>>(
802 &self,
803 updates: &[(N, ToolAvailability)],
804 ) -> Result<u64> {
805 let mut state = self.tool_state().await?;
806 for (name, availability) in updates {
807 state
808 .set_availability(name.as_ref(), Some(*availability))
809 .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
810 }
811 self.apply_tool_state(state).await
812 }
813
814 async fn clear_tool_availability_override(&self, name: &str) -> Result<u64> {
815 let mut state = self.tool_state().await?;
816 state
817 .set_availability(name, None)
818 .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
819 self.apply_tool_state(state).await
820 }
821
822 async fn active_tool_manifests(&self) -> Result<Vec<ToolManifest>> {
823 Ok(self.tool_state().await?.tool_manifests())
824 }
825
826 async fn add_tool_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
827 let tool_registry = self.tool_registry().await?;
828 let handle = tool_registry
829 .add_tool_provider(provider)
830 .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
831 self.refresh_tool_catalog().await?;
832 Ok(handle)
833 }
834
835 async fn remove_tool_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
836 let tool_registry = self.tool_registry().await?;
837 let generation = tool_registry
838 .remove_source(handle)
839 .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
840 self.refresh_tool_catalog().await?;
841 Ok(generation)
842 }
843
844 async fn create_child_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
845 let writer = self.runtime.writer();
846 let runtime = writer.lock().await;
847 let lifecycle = runtime.session_lifecycle_service()?;
848 lifecycle.create_session(request).await.map_err(Into::into)
849 }
850
851 async fn close_child_session(&self, session_id: &str) -> Result<()> {
852 let writer = self.runtime.writer();
853 let runtime = writer.lock().await;
854 let lifecycle = runtime.session_lifecycle_service()?;
855 lifecycle
856 .close_session(session_id)
857 .await
858 .map_err(Into::into)
859 }
860
861 async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
862 self.with_writer(async |runtime: &mut LashRuntime| {
863 runtime
864 .activate_managed_session(session_id)
865 .await
866 .map_err(Into::into)
867 })
868 .await
869 }
870
871 async fn inject_turn_input(&self, id: Option<String>, message: PluginMessage) -> Result<()> {
872 self.inject_turn_inputs(vec![lash_core::InjectedTurnInput { id, message }])
873 .await
874 }
875
876 async fn inject_turn_inputs(&self, messages: Vec<lash_core::InjectedTurnInput>) -> Result<()> {
877 for input in messages {
878 let source_key = input.id.map(|id| format!("injection:{id}"));
879 let turn_input = turn_input_from_plugin_message(input.message);
880 self.runtime
881 .enqueue_turn_input(
882 turn_input,
883 lash_core::DeliveryPolicy::EarliestSafeBoundary,
884 lash_core::SlotPolicy::Join,
885 source_key,
886 )
887 .await
888 .map(|_| ())
889 .map_err(EmbedError::Runtime)?;
890 }
891 Ok(())
892 }
893
894 async fn tool_registry(&self) -> Result<Arc<lash_core::ToolRegistry>> {
895 self.runtime
896 .writer()
897 .lock()
898 .await
899 .plugin_session()
900 .map(|session| session.tool_registry())
901 .ok_or_else(|| {
902 EmbedError::Session(SessionError::Protocol(
903 "tool registry is unavailable in this runtime session".to_string(),
904 ))
905 })
906 }
907}
908
909fn turn_input_from_plugin_message(message: PluginMessage) -> TurnInput {
910 let mut input = TurnInput::empty();
911 if !message.content.is_empty() {
912 input.items.push(InputItem::Text {
913 text: message.content,
914 });
915 }
916 for (index, bytes) in message.images.into_iter().enumerate() {
917 let id = format!("injected-image-{index}");
918 input.items.push(InputItem::ImageRef { id: id.clone() });
919 input.image_blobs.insert(id, bytes);
920 }
921 input
922}
923
924#[derive(Clone)]
925pub struct SessionConfigAdmin {
926 control: SessionAdmin,
927}
928
929impl SessionConfigAdmin {
930 pub async fn update(&self, patch: SessionConfigPatch) -> Result<()> {
931 self.control.update_config(patch).await
932 }
933
934 pub async fn update_session_config(
935 &self,
936 provider: Option<ProviderHandle>,
937 model: Option<lash_core::ModelSpec>,
938 prompt: Option<PromptLayer>,
939 ) -> Result<()> {
940 self.control
941 .update_session_config(provider, model, prompt)
942 .await
943 }
944
945 pub async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
946 self.control.set_prompt_template(template).await
947 }
948
949 pub async fn clear_prompt_template(&self) -> Result<()> {
950 self.control.clear_prompt_template().await
951 }
952
953 pub async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
954 self.control.add_prompt_contribution(contribution).await
955 }
956
957 pub async fn replace_prompt_slot(
958 &self,
959 slot: PromptSlot,
960 contributions: impl IntoIterator<Item = PromptContribution>,
961 ) -> Result<()> {
962 self.control.replace_prompt_slot(slot, contributions).await
963 }
964
965 pub async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
966 self.control.clear_prompt_slot(slot).await
967 }
968}
969
970#[derive(Clone)]
971pub struct ToolAdmin {
972 control: SessionAdmin,
973}
974
975impl ToolAdmin {
976 pub(crate) fn new(control: SessionAdmin) -> Self {
977 Self { control }
978 }
979}
980
981impl ToolAdmin {
982 pub async fn state(&self) -> Result<ToolState> {
983 self.control.tool_state().await
984 }
985
986 pub fn advanced(&self) -> AdvancedToolAdmin {
987 AdvancedToolAdmin {
988 control: self.control.clone(),
989 }
990 }
991
992 pub async fn set_availability(
993 &self,
994 name: impl AsRef<str>,
995 availability: ToolAvailability,
996 ) -> Result<u64> {
997 self.control
998 .set_tool_availability(name.as_ref(), availability)
999 .await
1000 }
1001
1002 pub async fn set_availability_many<N: AsRef<str>>(
1003 &self,
1004 updates: &[(N, ToolAvailability)],
1005 ) -> Result<u64> {
1006 self.control.set_tool_availability_many(updates).await
1007 }
1008
1009 pub async fn clear_availability_override(&self, name: impl AsRef<str>) -> Result<u64> {
1010 self.control
1011 .clear_tool_availability_override(name.as_ref())
1012 .await
1013 }
1014
1015 pub async fn active_manifests(&self) -> Result<Vec<ToolManifest>> {
1016 self.control.active_tool_manifests().await
1017 }
1018
1019 pub async fn add_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
1020 self.control.add_tool_provider(provider).await
1021 }
1022
1023 pub async fn remove_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
1024 self.control.remove_tool_source(handle).await
1025 }
1026}
1027
1028#[derive(Clone)]
1029pub struct AdvancedToolAdmin {
1030 control: SessionAdmin,
1031}
1032
1033impl AdvancedToolAdmin {
1034 pub async fn apply_state(&self, state: ToolState) -> Result<u64> {
1040 self.control.apply_tool_state(state).await
1041 }
1042
1043 pub async fn restore_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
1056 self.control.restore_tool_state(state).await
1057 }
1058}
1059
1060#[derive(Clone)]
1061pub struct SessionCommandAdmin {
1062 control: SessionAdmin,
1063}
1064
1065impl SessionCommandAdmin {
1066 pub async fn refresh_tool_catalog(
1071 &self,
1072 reason: impl Into<String>,
1073 idempotency_key: impl Into<String>,
1074 ) -> Result<lash_core::SessionCommandReceipt> {
1075 self.control
1076 .submit_session_command(
1077 lash_core::SessionCommand::RefreshToolCatalog {
1078 reason: reason.into(),
1079 },
1080 idempotency_key,
1081 )
1082 .await
1083 }
1084
1085 pub async fn reset(
1086 &self,
1087 reason: impl Into<String>,
1088 idempotency_key: impl Into<String>,
1089 ) -> Result<lash_core::SessionCommandReceipt> {
1090 self.control
1091 .submit_session_command(
1092 lash_core::SessionCommand::ResetSession {
1093 reason: reason.into(),
1094 },
1095 idempotency_key,
1096 )
1097 .await
1098 }
1099}
1100
1101#[derive(Clone)]
1103pub struct SessionTriggerAdmin {
1104 control: SessionAdmin,
1105}
1106
1107impl SessionTriggerAdmin {
1108 pub async fn list_all(&self) -> Result<Vec<lash_core::TriggerRegistration>> {
1114 self.control.list_trigger_registrations().await
1115 }
1116
1117 pub async fn by_source_type(
1122 &self,
1123 source_type: impl Into<lash_core::TriggerEventType>,
1124 ) -> Result<Vec<lash_core::TriggerRegistration>> {
1125 self.control
1126 .trigger_registrations_by_source_type(source_type)
1127 .await
1128 }
1129}
1130
1131#[derive(Clone)]
1132pub struct SessionProcessAdmin {
1133 control: SessionAdmin,
1134}
1135
1136impl SessionProcessAdmin {
1137 pub(crate) fn new(control: SessionAdmin) -> Self {
1138 Self { control }
1139 }
1140
1141 pub async fn start(
1142 &self,
1143 request: lash_core::ProcessStartRequest,
1144 scoped_effect_controller: ScopedEffectController<'_>,
1145 ) -> Result<lash_core::ProcessHandleSummary> {
1146 self.control
1147 .start_process(request, scoped_effect_controller)
1148 .await
1149 }
1150
1151 pub async fn list(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
1152 self.control.list_process_handles().await
1153 }
1154
1155 pub async fn list_all(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
1156 self.control.list_all_process_handles().await
1157 }
1158
1159 pub async fn await_all(&self) -> Result<()> {
1160 self.control.await_background_work().await
1161 }
1162
1163 pub async fn cancel(
1164 &self,
1165 process_id: &str,
1166 scoped_effect_controller: ScopedEffectController<'_>,
1167 ) -> Result<lash_core::ProcessCancelSummary> {
1168 self.control
1169 .cancel_process(process_id, scoped_effect_controller)
1170 .await
1171 }
1172
1173 pub async fn cancel_all(
1174 &self,
1175 scoped_effect_controller: ScopedEffectController<'_>,
1176 ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
1177 self.control
1178 .cancel_visible_processes(scoped_effect_controller)
1179 .await
1180 }
1181}
1182
1183#[derive(Clone)]
1184pub struct SessionStateAdmin {
1185 control: SessionAdmin,
1186}
1187
1188impl SessionStateAdmin {
1189 pub async fn export(&self) -> lash_core::SessionSnapshot {
1190 self.control.export_state().await
1191 }
1192
1193 pub async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
1194 self.control.append_messages(messages).await
1195 }
1196
1197 pub async fn append_plugin_body(
1198 &self,
1199 plugin_type: impl Into<String>,
1200 body: serde_json::Value,
1201 ) -> Result<()> {
1202 self.control.append_plugin_body(plugin_type, body).await
1203 }
1204
1205 pub async fn set_persisted(&self, state: RuntimeSessionState) -> Result<()> {
1206 self.control.set_persisted_state(state).await
1207 }
1208
1209 pub async fn branch_to_node(
1210 &self,
1211 target_leaf: Option<String>,
1212 ) -> Result<lash_core::SessionSnapshot> {
1213 self.control.branch_to_node(target_leaf).await
1214 }
1215
1216 pub async fn persist_current(&self) -> Result<RuntimeSessionState> {
1217 self.control.persist_current_state().await
1218 }
1219
1220 pub async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
1221 self.control.session_state_service().await
1222 }
1223
1224 pub async fn snapshot_execution(&self) -> Result<Option<Vec<u8>>> {
1225 self.control.snapshot_execution_state().await
1226 }
1227
1228 pub async fn restore_execution(&self, bytes: &[u8]) -> Result<()> {
1229 self.control.restore_execution_state(bytes).await
1230 }
1231
1232 pub async fn compact_context(
1233 &self,
1234 instructions: Option<String>,
1235 scoped_effect_controller: ScopedEffectController<'_>,
1236 ) -> Result<bool> {
1237 self.control
1238 .compact_context(instructions, scoped_effect_controller)
1239 .await
1240 }
1241}
1242
1243#[derive(Clone)]
1244pub struct PluginActions {
1245 pub(crate) control: SessionAdmin,
1246}
1247
1248impl PluginActions {
1249 pub async fn call<Op: lash_core::PluginAction>(&self, args: Op::Args) -> Result<Op::Output> {
1250 self.control.call_plugin_action::<Op>(args).await
1251 }
1252}
1253
1254#[derive(Clone)]
1255pub struct ChildSessionAdmin {
1256 control: SessionAdmin,
1257}
1258
1259impl ChildSessionAdmin {
1260 pub async fn create_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
1261 self.control.create_child_session(request).await
1262 }
1263
1264 pub async fn close_session(&self, session_id: &str) -> Result<()> {
1265 self.control.close_child_session(session_id).await
1266 }
1267
1268 pub async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
1269 self.control.activate_managed_session(session_id).await
1270 }
1271}
1272
1273#[derive(Clone)]
1274pub struct InjectionAdmin {
1275 control: SessionAdmin,
1276}
1277
1278impl InjectionAdmin {
1279 pub async fn inject_turn_input(
1280 &self,
1281 id: Option<String>,
1282 message: PluginMessage,
1283 ) -> Result<()> {
1284 self.control.inject_turn_input(id, message).await
1285 }
1286
1287 pub async fn inject_turn_inputs(
1288 &self,
1289 messages: Vec<lash_core::InjectedTurnInput>,
1290 ) -> Result<()> {
1291 self.control.inject_turn_inputs(messages).await
1292 }
1293}
1294
1295#[derive(Clone)]
1296pub struct ProtocolAdmin {
1297 control: SessionAdmin,
1298}
1299
1300impl ProtocolAdmin {
1301 pub async fn apply_session_extension(
1302 &self,
1303 extension: lash_core::ProtocolSessionExtensionHandle,
1304 ) -> Result<()> {
1305 self.control
1306 .apply_protocol_session_extension(extension)
1307 .await
1308 }
1309}