1pub use crate::session::SessionConfigPatch;
2use crate::support::*;
3pub use lash_core::{AcceptedInjectedTurnInput, PluginCommand, PluginQuery, PluginTask};
4
5#[derive(Clone)]
6pub struct Completions {
7 pub(crate) core: LashCore,
8}
9
10impl Completions {
11 pub async fn resolve(
12 &self,
13 key: lash_core::AwaitEventKey,
14 resolution: lash_core::Resolution,
15 ) -> Result<lash_core::ResolveOutcome> {
16 self.core
17 .env
18 .core
19 .control
20 .effect_host
21 .resolve_await_event(&key, resolution)
22 .await
23 .map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))
24 }
25}
26
27#[derive(Clone)]
28pub struct CoreTriggerAdmin {
29 pub(crate) core: LashCore,
30}
31
32impl CoreTriggerAdmin {
33 pub async fn emit(
34 &self,
35 request: lash_core::TriggerOccurrenceRequest,
36 scoped_effect_controller: ScopedEffectController<'_>,
37 ) -> Result<lash_core::TriggerEmitReport> {
38 let store = self.core.env.trigger_store.as_ref().ok_or_else(|| {
39 EmbedError::Plugin(lash_core::PluginError::Session(
40 "trigger store is unavailable in this runtime".to_string(),
41 ))
42 })?;
43 let drivers = self.core.work_driver.drivers().await;
44 let router = lash_core::TriggerRouter::new(
45 Arc::clone(store),
46 self.core.env.process_registry.clone(),
47 drivers.process,
48 );
49 router
50 .emit(request, scoped_effect_controller.controller())
51 .await
52 .map_err(Into::into)
53 }
54
55 pub async fn subscriptions(
56 &self,
57 filter: lash_core::TriggerSubscriptionFilter,
58 ) -> Result<Vec<lash_core::TriggerRegistration>> {
59 let store = self.core.env.trigger_store.as_ref().ok_or_else(|| {
60 EmbedError::Plugin(lash_core::PluginError::Session(
61 "trigger store is unavailable in this runtime".to_string(),
62 ))
63 })?;
64 let records = store.list_subscriptions(filter).await?;
65 Ok(records
66 .iter()
67 .map(lash_core::TriggerRegistration::from)
68 .collect())
69 }
70}
71
72#[derive(Clone)]
73pub struct Processes {
74 pub(crate) core: LashCore,
75}
76
77impl Processes {
78 fn registry(&self) -> Result<Arc<dyn lash_core::ProcessRegistry>> {
79 self.core
80 .env
81 .process_registry
82 .as_ref()
83 .cloned()
84 .ok_or_else(|| {
85 EmbedError::Plugin(lash_core::PluginError::Session(
86 "process registry is unavailable in this runtime".to_string(),
87 ))
88 })
89 }
90
91 fn make_observer(&self) -> Result<lash_core::ProcessWorkObserver> {
92 Ok(lash_core::ProcessWorkObserver::new(self.registry()?))
93 }
94
95 fn process_invocation(command: &lash_core::ProcessCommand) -> lash_core::RuntimeInvocation {
96 let effect_id = command.effect_id();
97 lash_core::RuntimeInvocation::effect(
98 lash_core::runtime::RuntimeScope::new("runtime"),
99 effect_id.clone(),
100 lash_core::RuntimeEffectKind::Process,
101 effect_id,
102 )
103 }
104
105 async fn run_command(
106 &self,
107 command: lash_core::ProcessCommand,
108 scoped_effect_controller: ScopedEffectController<'_>,
109 ) -> Result<lash_core::ProcessEffectOutcome> {
110 let registry = self.registry()?;
111 let invocation = Self::process_invocation(&command);
112 let outcome = scoped_effect_controller
113 .controller()
114 .execute_effect(
115 lash_core::RuntimeEffectEnvelope::new(
116 invocation,
117 lash_core::RuntimeEffectCommand::process(command),
118 ),
119 lash_core::RuntimeEffectLocalExecutor::processes(registry),
120 )
121 .await
122 .map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))?;
123 match outcome {
124 lash_core::RuntimeEffectOutcome::Process { result } => Ok(result),
125 _ => Err(EmbedError::Plugin(lash_core::PluginError::Session(
126 "process effect returned non-process outcome".to_string(),
127 ))),
128 }
129 }
130
131 pub async fn start(
132 &self,
133 request: lash_core::ProcessStartRequest,
134 scoped_effect_controller: ScopedEffectController<'_>,
135 ) -> Result<lash_core::ProcessRecord> {
136 let env_ref = match request.env_spec.as_ref() {
137 Some(env_spec) => Some(
138 lash_core::runtime::persist_process_execution_env(
139 self.core.env.core.durability.process_env_store.as_ref(),
140 env_spec,
141 )
142 .await?,
143 ),
144 None => None,
145 };
146 let grant = request.grant.clone();
147 let registration = request.into_registration(env_ref);
148 let command = lash_core::ProcessCommand::Start {
149 registration,
150 grant,
151 execution_context: Box::new(lash_core::ProcessExecutionContext::default()),
152 };
153 let outcome = self
154 .run_command(command, scoped_effect_controller.clone())
155 .await?;
156 let lash_core::ProcessEffectOutcome::Start { record } = outcome else {
157 return Err(EmbedError::Plugin(lash_core::PluginError::Session(
158 "process start returned the wrong outcome".to_string(),
159 )));
160 };
161 if let Some(driver) = self.core.work_driver.drivers().await.process {
162 driver.claim_and_run_pending("admin_process_start").await?;
163 }
164 Ok(record)
165 }
166
167 pub async fn list(
168 &self,
169 filter: &lash_core::ProcessListFilter,
170 ) -> Result<Vec<lash_core::ObservedProcess>> {
171 self.make_observer()?.list(filter).await.map_err(Into::into)
172 }
173
174 pub async fn get(&self, process_id: &str) -> Result<Option<lash_core::ObservedProcess>> {
175 Ok(self.make_observer()?.process(process_id).await)
176 }
177
178 pub async fn events(
179 &self,
180 process_id: &str,
181 after_sequence: u64,
182 ) -> Result<Vec<lash_core::ObservedProcessEvent>> {
183 self.make_observer()?
184 .events_after(process_id, after_sequence)
185 .await
186 .map_err(Into::into)
187 }
188
189 pub async fn await_output(&self, process_id: &str) -> Result<lash_core::ProcessAwaitOutput> {
190 self.registry()?
191 .await_process(process_id)
192 .await
193 .map_err(Into::into)
194 }
195
196 pub async fn cancel(
197 &self,
198 process_id: &str,
199 scoped_effect_controller: ScopedEffectController<'_>,
200 ) -> Result<lash_core::ProcessCancelSummary> {
201 let command = lash_core::ProcessCommand::Cancel {
202 process_id: process_id.to_string(),
203 reason: Some("requested by host".to_string()),
204 };
205 let outcome = self
206 .run_command(command, scoped_effect_controller.clone())
207 .await?;
208 let lash_core::ProcessEffectOutcome::Cancel { record } = outcome else {
209 return Err(EmbedError::Plugin(lash_core::PluginError::Session(
210 "process cancel returned the wrong outcome".to_string(),
211 )));
212 };
213 Ok(lash_core::ProcessCancelSummary::from_record(record))
214 }
215
216 pub async fn signal(
217 &self,
218 process_id: &str,
219 signal_name: impl Into<String>,
220 signal_id: impl Into<String>,
221 request: lash_core::ProcessEventAppendRequest,
222 scoped_effect_controller: ScopedEffectController<'_>,
223 ) -> Result<lash_core::ProcessEvent> {
224 let signal_name = signal_name.into();
225 let event_type = request.event_type.clone();
226 let payload = request.payload.clone();
227 let command = lash_core::ProcessCommand::Signal {
228 process_id: process_id.to_string(),
229 signal_name: signal_name.clone(),
230 signal_id: signal_id.into(),
231 request,
232 };
233 let outcome = self
234 .run_command(command, scoped_effect_controller.clone())
235 .await?;
236 let lash_core::ProcessEffectOutcome::Signal { event } = outcome else {
237 return Err(EmbedError::Plugin(lash_core::PluginError::Session(
238 "process signal returned the wrong outcome".to_string(),
239 )));
240 };
241 let registry = self.registry()?;
242 let waiting_ordinal =
243 registry
244 .get_process(process_id)
245 .await
246 .and_then(|record| match record.wait {
247 Some(lash_core::WaitState {
248 kind:
249 lash_core::WaitKind::Signal {
250 name,
251 event_type: wait_event_type,
252 ordinal,
253 ..
254 },
255 ..
256 }) if name == signal_name && wait_event_type == event_type => Some(ordinal),
257 _ => None,
258 });
259 let ordinal = match waiting_ordinal {
260 Some(ordinal) => ordinal,
261 None => {
262 registry
263 .count_events_through(process_id, &event_type, event.sequence)
264 .await?
265 }
266 };
267 if ordinal > 0 {
268 let key = scoped_effect_controller
269 .controller()
270 .await_event_key(
271 &lash_core::ExecutionScope::process(process_id),
272 lash_core::AwaitEventWaitIdentity::process_signal(
273 process_id,
274 &signal_name,
275 ordinal,
276 ),
277 )
278 .await
279 .map_err(|err| {
280 EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
281 })?;
282 let _ = scoped_effect_controller
283 .controller()
284 .resolve_await_event(&key, lash_core::Resolution::Ok(payload))
285 .await
286 .map_err(|err| {
287 EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
288 })?;
289 }
290 Ok(event)
291 }
292
293 pub async fn session_snapshot(
294 &self,
295 session_id: impl Into<String>,
296 ) -> Result<lash_core::ProcessWorkSnapshot> {
297 self.make_observer()?
298 .snapshot_for_session(session_id)
299 .await
300 .map_err(Into::into)
301 }
302
303 pub fn observer(&self) -> Result<lash_core::ProcessWorkObserver> {
304 self.make_observer()
305 }
306}
307
308#[derive(Clone)]
309pub struct SessionAdmin {
310 pub(crate) runtime: RuntimeHandle,
311}
312
313impl SessionAdmin {
314 pub fn config(&self) -> SessionConfigAdmin {
315 SessionConfigAdmin {
316 control: self.clone(),
317 }
318 }
319
320 pub fn tools(&self) -> ToolAdmin {
321 ToolAdmin {
322 control: self.clone(),
323 }
324 }
325
326 pub fn commands(&self) -> SessionCommandAdmin {
327 SessionCommandAdmin {
328 control: self.clone(),
329 }
330 }
331
332 pub fn triggers(&self) -> SessionTriggerAdmin {
333 SessionTriggerAdmin {
334 control: self.clone(),
335 }
336 }
337
338 pub fn state(&self) -> SessionStateAdmin {
339 SessionStateAdmin {
340 control: self.clone(),
341 }
342 }
343
344 pub fn children(&self) -> ChildSessionAdmin {
345 ChildSessionAdmin {
346 control: self.clone(),
347 }
348 }
349
350 pub fn injection(&self) -> InjectionAdmin {
351 InjectionAdmin {
352 control: self.clone(),
353 }
354 }
355
356 pub fn protocol(&self) -> ProtocolAdmin {
357 ProtocolAdmin {
358 control: self.clone(),
359 }
360 }
361
362 pub fn processes(&self) -> SessionProcessAdmin {
363 SessionProcessAdmin {
364 control: self.clone(),
365 }
366 }
367
368 async fn with_writer<F, T>(&self, f: F) -> T
373 where
374 F: AsyncFnOnce(&mut LashRuntime) -> T,
375 {
376 let writer = self.runtime.writer();
377 let mut runtime = writer.lock().await;
378 let value = f(&mut runtime).await;
379 self.runtime.publish_from(&runtime);
380 value
381 }
382
383 async fn update_config(&self, patch: SessionConfigPatch) -> Result<()> {
384 self.update_session_config(patch.provider, patch.model, patch.prompt)
385 .await?;
386 Ok(())
387 }
388
389 async fn update_session_config(
390 &self,
391 provider: Option<ProviderHandle>,
392 model: Option<lash_core::ModelSpec>,
393 prompt: Option<PromptLayer>,
394 ) -> Result<()> {
395 self.with_writer(async |runtime: &mut LashRuntime| {
396 runtime.update_session_config(provider, model, prompt).await;
397 })
398 .await;
399 Ok(())
400 }
401
402 async fn export_state(&self) -> lash_core::SessionSnapshot {
403 self.runtime.observe().read_view.to_snapshot()
404 }
405
406 async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
407 self.with_writer(async |runtime: &mut LashRuntime| {
408 runtime
409 .append_session_nodes(lash_core::AppendSessionNodesRequest {
410 nodes: messages
411 .into_iter()
412 .map(lash_core::SessionAppendNode::message)
413 .collect(),
414 requires_ancestor_node_id: None,
415 })
416 .await
417 .map(|_| ())
418 .map_err(Into::into)
419 })
420 .await
421 }
422
423 async fn append_plugin_body(
424 &self,
425 plugin_type: impl Into<String>,
426 body: serde_json::Value,
427 ) -> Result<()> {
428 self.with_writer(async |runtime: &mut LashRuntime| {
429 runtime
430 .append_session_nodes(lash_core::AppendSessionNodesRequest {
431 nodes: vec![lash_core::SessionAppendNode::plugin(plugin_type, body)],
432 requires_ancestor_node_id: None,
433 })
434 .await
435 .map(|_| ())
436 .map_err(Into::into)
437 })
438 .await
439 }
440
441 async fn set_persisted_state(&self, state: RuntimeSessionState) -> Result<()> {
442 self.with_writer(async |runtime: &mut LashRuntime| {
443 runtime.set_persisted_state(state).map_err(Into::into)
444 })
445 .await
446 }
447
448 async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
449 self.with_writer(async |runtime: &mut LashRuntime| {
450 runtime.set_prompt_template(template).await;
451 })
452 .await;
453 Ok(())
454 }
455
456 async fn clear_prompt_template(&self) -> Result<()> {
457 self.with_writer(async |runtime: &mut LashRuntime| {
458 runtime.clear_prompt_template().await;
459 })
460 .await;
461 Ok(())
462 }
463
464 async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
465 self.with_writer(async |runtime: &mut LashRuntime| {
466 runtime.add_prompt_contribution(contribution).await;
467 })
468 .await;
469 Ok(())
470 }
471
472 async fn replace_prompt_slot(
473 &self,
474 slot: PromptSlot,
475 contributions: impl IntoIterator<Item = PromptContribution>,
476 ) -> Result<()> {
477 self.with_writer(async |runtime: &mut LashRuntime| {
478 runtime.replace_prompt_slot(slot, contributions).await;
479 })
480 .await;
481 Ok(())
482 }
483
484 async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
485 self.with_writer(async |runtime: &mut LashRuntime| {
486 runtime.clear_prompt_slot(slot).await;
487 })
488 .await;
489 Ok(())
490 }
491
492 async fn apply_protocol_session_extension(
493 &self,
494 extension: lash_core::ProtocolSessionExtensionHandle,
495 ) -> Result<()> {
496 self.with_writer(async |runtime: &mut LashRuntime| {
497 runtime
498 .apply_protocol_session_extension(extension)
499 .await
500 .map_err(Into::into)
501 })
502 .await
503 }
504
505 async fn branch_to_node(
506 &self,
507 target_leaf: Option<String>,
508 ) -> Result<lash_core::SessionSnapshot> {
509 self.with_writer(async |runtime: &mut LashRuntime| {
510 runtime
511 .branch_to_node(target_leaf)
512 .await
513 .map_err(Into::into)
514 })
515 .await
516 }
517
518 async fn await_background_work(&self) -> Result<()> {
519 self.with_writer(async |runtime: &mut LashRuntime| {
520 runtime.await_background_work().await.map_err(Into::into)
521 })
522 .await
523 }
524
525 async fn refresh_tool_catalog(&self) -> Result<()> {
526 self.with_writer(async |runtime: &mut LashRuntime| {
527 runtime
528 .refresh_session_tool_catalog()
529 .await
530 .map_err(Into::into)
531 })
532 .await
533 }
534
535 async fn submit_session_command(
536 &self,
537 command: lash_core::SessionCommand,
538 idempotency_key: impl Into<String>,
539 ) -> Result<lash_core::SessionCommandReceipt> {
540 let idempotency_key = idempotency_key.into();
541 self.with_writer(async |runtime: &mut LashRuntime| {
542 runtime
543 .submit_session_command(command, idempotency_key)
544 .await
545 .map_err(Into::into)
546 })
547 .await
548 }
549
550 async fn list_trigger_registrations(&self) -> Result<Vec<lash_core::TriggerRegistration>> {
551 self.with_writer(async |runtime: &mut LashRuntime| {
552 runtime
553 .list_trigger_registrations()
554 .await
555 .map_err(Into::into)
556 })
557 .await
558 }
559
560 async fn trigger_registrations_by_source_type(
561 &self,
562 source_type: impl Into<lash_core::TriggerEventType>,
563 ) -> Result<Vec<lash_core::TriggerRegistration>> {
564 self.with_writer(async |runtime: &mut LashRuntime| {
565 runtime
566 .trigger_registrations_by_source_type(source_type)
567 .await
568 .map_err(Into::into)
569 })
570 .await
571 }
572
573 async fn query_plugin_raw(
574 &self,
575 name: &str,
576 args: serde_json::Value,
577 ) -> Result<(String, serde_json::Value)> {
578 let observation = self.runtime.observe();
579 let session_id = observation.session_id().to_string();
580 observation
581 .query_plugin(name, args, Some(session_id))
582 .await
583 .map_err(Into::into)
584 }
585
586 async fn run_plugin_command_raw(
587 &self,
588 name: &str,
589 args: serde_json::Value,
590 ) -> Result<lash_core::PluginCommandReceipt<serde_json::Value>> {
591 let session_id = self.runtime.observe().session_id().to_string();
592 let writer = self.runtime.writer();
593 let mut runtime = writer.lock().await;
594 let receipt = runtime
595 .run_plugin_command(name, args, Some(session_id))
596 .await?;
597 self.record_plugin_operation_observations(&receipt.events, &receipt.pending_turn_inputs);
598 self.runtime.publish_from(&runtime);
599 Ok(receipt)
600 }
601
602 async fn run_plugin_task_raw_with_cancel(
603 &self,
604 name: &str,
605 args: serde_json::Value,
606 cancellation_token: CancellationToken,
607 ) -> Result<lash_core::PluginTaskReceipt<serde_json::Value>> {
608 let session_id = self.runtime.observe().session_id().to_string();
609 let writer = self.runtime.writer();
610 let mut runtime = writer.lock().await;
611 let scope_id = format!(
612 "{session_id}:plugin_task:{name}:{}",
613 lash_core::TurnActivityId::fresh().0
614 );
615 let scoped_effect_controller = runtime
616 .effect_host()
617 .scoped_static(lash_core::ExecutionScope::runtime_operation(scope_id))
618 .map_err(EmbedError::Runtime)?
619 .ok_or_else(|| {
620 EmbedError::Plugin(lash_core::PluginError::Session(
621 "plugin task execution requires an effect host that can create a static runtime-operation scope".to_string(),
622 ))
623 })?;
624 let receipt = runtime
625 .run_plugin_task(
626 name,
627 args,
628 Some(session_id),
629 scoped_effect_controller,
630 cancellation_token,
631 )
632 .await?;
633 self.record_plugin_operation_observations(&receipt.events, &receipt.pending_turn_inputs);
634 self.runtime.publish_from(&runtime);
635 Ok(receipt)
636 }
637
638 fn record_plugin_operation_observations(
639 &self,
640 events: &[lash_core::PluginOwned<lash_core::PluginRuntimeEvent>],
641 pending_turn_inputs: &[lash_core::PendingTurnInput],
642 ) {
643 for owned in events {
644 self.runtime
645 .record_turn_activity(lash_core::TurnActivity::independent(
646 lash_core::TurnEvent::PluginRuntime {
647 plugin_id: owned.plugin_id.clone(),
648 event: owned.value.clone(),
649 },
650 ));
651 }
652 if !pending_turn_inputs.is_empty() {
653 self.runtime.record_queue_changed(
654 lash_core::SessionQueueEventKind::Enqueued,
655 pending_turn_inputs
656 .iter()
657 .map(|input| input.input_id.clone())
658 .collect(),
659 );
660 }
661 }
662
663 async fn compact_context(
664 &self,
665 instructions: Option<String>,
666 scoped_effect_controller: ScopedEffectController<'_>,
667 ) -> Result<bool> {
668 self.with_writer(async |runtime: &mut LashRuntime| {
669 runtime
670 .compact_context(instructions, scoped_effect_controller)
671 .await
672 .map_err(Into::into)
673 })
674 .await
675 }
676
677 async fn persist_current_state(&self) -> Result<RuntimeSessionState> {
678 self.with_writer(async |runtime: &mut LashRuntime| {
679 runtime.await_background_work().await?;
680 Ok(runtime.export_persisted_state())
681 })
682 .await
683 }
684
685 async fn list_process_handles(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
686 Ok(self.runtime.observe().list_process_handles().await)
687 }
688
689 async fn list_all_process_handles(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
690 Ok(self.runtime.observe().list_all_process_handles().await)
691 }
692
693 async fn start_process(
694 &self,
695 request: lash_core::ProcessStartRequest,
696 scoped_effect_controller: ScopedEffectController<'_>,
697 ) -> Result<lash_core::ProcessHandleSummary> {
698 let writer = self.runtime.writer();
699 let runtime = writer.lock().await;
700 let session_id = runtime.session_id().to_string();
701 let processes = runtime.process_service()?;
702 let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
703 let summary = processes
704 .start_from_request(&session_id, request, scope)
705 .await
706 .map_err(EmbedError::Plugin)?;
707 self.runtime.record_process_changed(
708 SessionProcessEventKind::Started,
709 vec![summary.process_id.clone()],
710 );
711 Ok(summary)
712 }
713
714 async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
715 self.runtime
716 .writer()
717 .lock()
718 .await
719 .session_state_service()
720 .map_err(Into::into)
721 }
722
723 async fn cancel_process(
724 &self,
725 process_id: &str,
726 scoped_effect_controller: ScopedEffectController<'_>,
727 ) -> Result<lash_core::ProcessCancelSummary> {
728 let writer = self.runtime.writer();
729 let runtime = writer.lock().await;
730 let session_id = runtime.session_id().to_string();
731 let processes = runtime.process_service()?;
732 let cancel_ability = runtime.process_cancel_ability();
733 let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
734 let summary = cancel_ability
735 .cancel_summary(
736 processes.as_ref(),
737 lash_core::ProcessCancelRequest::new(
738 &session_id,
739 process_id,
740 scope,
741 lash_core::ProcessCancelSource::HostApi,
742 )
743 .with_reason("requested by host API"),
744 )
745 .await
746 .map_err(EmbedError::Plugin)?;
747 self.runtime.record_process_changed(
748 SessionProcessEventKind::Cancelled,
749 vec![summary.process_id.clone()],
750 );
751 Ok(summary)
752 }
753
754 async fn cancel_visible_processes(
755 &self,
756 scoped_effect_controller: ScopedEffectController<'_>,
757 ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
758 let writer = self.runtime.writer();
759 let runtime = writer.lock().await;
760 let session_id = runtime.session_id().to_string();
761 let processes = runtime.process_service()?;
762 let cancel_ability = runtime.process_cancel_ability();
763 let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
764 let summaries = cancel_ability
765 .cancel_all_visible(
766 processes.as_ref(),
767 lash_core::ProcessCancelAllRequest::new(
768 &session_id,
769 scope,
770 lash_core::ProcessCancelSource::HostApi,
771 )
772 .with_reason("requested by host API"),
773 )
774 .await
775 .map_err(EmbedError::Plugin)?;
776 self.runtime.record_process_changed(
777 SessionProcessEventKind::Cancelled,
778 summaries
779 .iter()
780 .map(|summary| summary.process_id.clone())
781 .collect(),
782 );
783 Ok(summaries)
784 }
785
786 async fn snapshot_execution_state(&self) -> Result<Option<Vec<u8>>> {
787 self.with_writer(async |runtime: &mut LashRuntime| {
788 runtime.snapshot_execution_state().await.map_err(Into::into)
789 })
790 .await
791 }
792
793 async fn restore_execution_state(&self, bytes: &[u8]) -> Result<()> {
794 self.with_writer(async |runtime: &mut LashRuntime| {
795 runtime
796 .restore_execution_state(bytes)
797 .await
798 .map_err(Into::into)
799 })
800 .await
801 }
802
803 async fn tool_state(&self) -> Result<ToolState> {
804 self.runtime.observe().tool_state.clone().ok_or_else(|| {
805 EmbedError::Session(SessionError::Protocol(
806 "runtime session not available".to_string(),
807 ))
808 })
809 }
810
811 async fn apply_tool_state(&self, state: ToolState) -> Result<u64> {
812 self.with_writer(async |runtime: &mut LashRuntime| {
813 runtime
814 .apply_tool_state(state)
815 .await
816 .map_err(EmbedError::from)
817 })
818 .await
819 }
820
821 async fn restore_tool_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
822 self.with_writer(async |runtime: &mut LashRuntime| {
823 runtime
824 .restore_tool_state(state)
825 .await
826 .map_err(EmbedError::from)
827 })
828 .await
829 }
830
831 async fn set_tool_membership(&self, tool_id: lash_core::ToolId, present: bool) -> Result<u64> {
832 self.set_tool_membership_many(&[(tool_id, present)]).await
833 }
834
835 async fn set_tool_membership_many(&self, updates: &[(lash_core::ToolId, bool)]) -> Result<u64> {
836 let mut state = self.tool_state().await?;
837 for (tool_id, present) in updates {
838 state
839 .set_membership(tool_id, *present)
840 .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
841 }
842 self.apply_tool_state(state).await
843 }
844
845 async fn active_tool_manifests(&self) -> Result<Vec<ToolManifest>> {
846 Ok(self.tool_state().await?.tool_manifests())
847 }
848
849 async fn add_tool_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
850 let tool_registry = self.tool_registry().await?;
851 let handle = tool_registry
852 .add_tool_provider(provider)
853 .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
854 self.refresh_tool_catalog().await?;
855 Ok(handle)
856 }
857
858 async fn remove_tool_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
859 let tool_registry = self.tool_registry().await?;
860 let generation = tool_registry
861 .remove_source(handle)
862 .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
863 self.refresh_tool_catalog().await?;
864 Ok(generation)
865 }
866
867 async fn create_child_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
868 let writer = self.runtime.writer();
869 let runtime = writer.lock().await;
870 let lifecycle = runtime.session_lifecycle_service()?;
871 lifecycle.create_session(request).await.map_err(Into::into)
872 }
873
874 async fn close_child_session(&self, session_id: &str) -> Result<()> {
875 let writer = self.runtime.writer();
876 let runtime = writer.lock().await;
877 let lifecycle = runtime.session_lifecycle_service()?;
878 lifecycle
879 .close_session(session_id)
880 .await
881 .map_err(Into::into)
882 }
883
884 async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
885 self.with_writer(async |runtime: &mut LashRuntime| {
886 runtime
887 .activate_managed_session(session_id)
888 .await
889 .map_err(Into::into)
890 })
891 .await
892 }
893
894 async fn inject_turn_input(
895 &self,
896 turn_id: &str,
897 id: Option<String>,
898 message: PluginMessage,
899 ) -> Result<()> {
900 self.inject_turn_inputs_for_turn(
901 turn_id,
902 vec![lash_core::InjectedTurnInput { id, message }],
903 )
904 .await
905 }
906
907 async fn inject_turn_inputs_for_turn(
908 &self,
909 turn_id: &str,
910 messages: Vec<lash_core::InjectedTurnInput>,
911 ) -> Result<()> {
912 for input in messages {
913 let source_key = input.id.map(|id| format!("injection:{id}"));
914 let turn_input = turn_input_from_plugin_message(input.message);
915 self.runtime
916 .enqueue_turn_input(
917 turn_input,
918 lash_core::TurnInputIngress::active_turn(
919 turn_id,
920 lash_core::TurnInputCheckpointBoundary::AfterWork,
921 ),
922 source_key,
923 )
924 .await
925 .map(|_| ())
926 .map_err(EmbedError::Runtime)?;
927 }
928 Ok(())
929 }
930
931 async fn tool_registry(&self) -> Result<Arc<lash_core::ToolRegistry>> {
932 self.runtime
933 .writer()
934 .lock()
935 .await
936 .plugin_session()
937 .map(|session| session.tool_registry())
938 .ok_or_else(|| {
939 EmbedError::Session(SessionError::Protocol(
940 "tool registry is unavailable in this runtime session".to_string(),
941 ))
942 })
943 }
944}
945
946fn turn_input_from_plugin_message(message: PluginMessage) -> TurnInput {
947 let mut input = TurnInput::empty();
948 if !message.content.is_empty() {
949 input.items.push(InputItem::Text {
950 text: message.content,
951 });
952 }
953 for (index, bytes) in message.images.into_iter().enumerate() {
954 let id = format!("injected-image-{index}");
955 input.items.push(InputItem::ImageRef { id: id.clone() });
956 input.image_blobs.insert(id, bytes);
957 }
958 input
959}
960
961#[derive(Clone)]
962pub struct SessionConfigAdmin {
963 control: SessionAdmin,
964}
965
966impl SessionConfigAdmin {
967 pub async fn update(&self, patch: SessionConfigPatch) -> Result<()> {
968 self.control.update_config(patch).await
969 }
970
971 pub async fn update_session_config(
972 &self,
973 provider: Option<ProviderHandle>,
974 model: Option<lash_core::ModelSpec>,
975 prompt: Option<PromptLayer>,
976 ) -> Result<()> {
977 self.control
978 .update_session_config(provider, model, prompt)
979 .await
980 }
981
982 pub async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
983 self.control.set_prompt_template(template).await
984 }
985
986 pub async fn clear_prompt_template(&self) -> Result<()> {
987 self.control.clear_prompt_template().await
988 }
989
990 pub async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
991 self.control.add_prompt_contribution(contribution).await
992 }
993
994 pub async fn replace_prompt_slot(
995 &self,
996 slot: PromptSlot,
997 contributions: impl IntoIterator<Item = PromptContribution>,
998 ) -> Result<()> {
999 self.control.replace_prompt_slot(slot, contributions).await
1000 }
1001
1002 pub async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
1003 self.control.clear_prompt_slot(slot).await
1004 }
1005}
1006
1007#[derive(Clone)]
1008pub struct ToolAdmin {
1009 control: SessionAdmin,
1010}
1011
1012impl ToolAdmin {
1013 pub(crate) fn new(control: SessionAdmin) -> Self {
1014 Self { control }
1015 }
1016}
1017
1018impl ToolAdmin {
1019 pub async fn state(&self) -> Result<ToolState> {
1020 self.control.tool_state().await
1021 }
1022
1023 pub fn advanced(&self) -> AdvancedToolAdmin {
1024 AdvancedToolAdmin {
1025 control: self.control.clone(),
1026 }
1027 }
1028
1029 pub async fn set_membership(
1032 &self,
1033 tool_id: impl Into<lash_core::ToolId>,
1034 present: bool,
1035 ) -> Result<u64> {
1036 self.control
1037 .set_tool_membership(tool_id.into(), present)
1038 .await
1039 }
1040
1041 pub async fn set_membership_many(&self, updates: &[(lash_core::ToolId, bool)]) -> Result<u64> {
1042 self.control.set_tool_membership_many(updates).await
1043 }
1044
1045 pub async fn active_manifests(&self) -> Result<Vec<ToolManifest>> {
1046 self.control.active_tool_manifests().await
1047 }
1048
1049 pub async fn add_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
1050 self.control.add_tool_provider(provider).await
1051 }
1052
1053 pub async fn remove_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
1054 self.control.remove_tool_source(handle).await
1055 }
1056}
1057
1058#[derive(Clone)]
1059pub struct AdvancedToolAdmin {
1060 control: SessionAdmin,
1061}
1062
1063impl AdvancedToolAdmin {
1064 pub async fn apply_state(&self, state: ToolState) -> Result<u64> {
1070 self.control.apply_tool_state(state).await
1071 }
1072
1073 pub async fn restore_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
1086 self.control.restore_tool_state(state).await
1087 }
1088}
1089
1090#[derive(Clone)]
1091pub struct SessionCommandAdmin {
1092 control: SessionAdmin,
1093}
1094
1095impl SessionCommandAdmin {
1096 pub async fn refresh_tool_catalog(
1101 &self,
1102 reason: impl Into<String>,
1103 idempotency_key: impl Into<String>,
1104 ) -> Result<lash_core::SessionCommandReceipt> {
1105 self.control
1106 .submit_session_command(
1107 lash_core::SessionCommand::RefreshToolCatalog {
1108 reason: reason.into(),
1109 },
1110 idempotency_key,
1111 )
1112 .await
1113 }
1114
1115 pub async fn reset(
1116 &self,
1117 reason: impl Into<String>,
1118 idempotency_key: impl Into<String>,
1119 ) -> Result<lash_core::SessionCommandReceipt> {
1120 self.control
1121 .submit_session_command(
1122 lash_core::SessionCommand::ResetSession {
1123 reason: reason.into(),
1124 },
1125 idempotency_key,
1126 )
1127 .await
1128 }
1129}
1130
1131#[derive(Clone)]
1133pub struct SessionTriggerAdmin {
1134 control: SessionAdmin,
1135}
1136
1137impl SessionTriggerAdmin {
1138 pub async fn list_all(&self) -> Result<Vec<lash_core::TriggerRegistration>> {
1144 self.control.list_trigger_registrations().await
1145 }
1146
1147 pub async fn by_source_type(
1152 &self,
1153 source_type: impl Into<lash_core::TriggerEventType>,
1154 ) -> Result<Vec<lash_core::TriggerRegistration>> {
1155 self.control
1156 .trigger_registrations_by_source_type(source_type)
1157 .await
1158 }
1159}
1160
1161#[derive(Clone)]
1162pub struct SessionProcessAdmin {
1163 control: SessionAdmin,
1164}
1165
1166impl SessionProcessAdmin {
1167 pub(crate) fn new(control: SessionAdmin) -> Self {
1168 Self { control }
1169 }
1170
1171 pub async fn start(
1172 &self,
1173 request: lash_core::ProcessStartRequest,
1174 scoped_effect_controller: ScopedEffectController<'_>,
1175 ) -> Result<lash_core::ProcessHandleSummary> {
1176 self.control
1177 .start_process(request, scoped_effect_controller)
1178 .await
1179 }
1180
1181 pub async fn list(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
1182 self.control.list_process_handles().await
1183 }
1184
1185 pub async fn list_all(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
1186 self.control.list_all_process_handles().await
1187 }
1188
1189 pub async fn await_all(&self) -> Result<()> {
1190 self.control.await_background_work().await
1191 }
1192
1193 pub async fn cancel(
1194 &self,
1195 process_id: &str,
1196 scoped_effect_controller: ScopedEffectController<'_>,
1197 ) -> Result<lash_core::ProcessCancelSummary> {
1198 self.control
1199 .cancel_process(process_id, scoped_effect_controller)
1200 .await
1201 }
1202
1203 pub async fn cancel_all(
1204 &self,
1205 scoped_effect_controller: ScopedEffectController<'_>,
1206 ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
1207 self.control
1208 .cancel_visible_processes(scoped_effect_controller)
1209 .await
1210 }
1211}
1212
1213#[derive(Clone)]
1214pub struct SessionStateAdmin {
1215 control: SessionAdmin,
1216}
1217
1218impl SessionStateAdmin {
1219 pub async fn export(&self) -> lash_core::SessionSnapshot {
1220 self.control.export_state().await
1221 }
1222
1223 pub async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
1224 self.control.append_messages(messages).await
1225 }
1226
1227 pub async fn append_plugin_body(
1228 &self,
1229 plugin_type: impl Into<String>,
1230 body: serde_json::Value,
1231 ) -> Result<()> {
1232 self.control.append_plugin_body(plugin_type, body).await
1233 }
1234
1235 pub async fn set_persisted(&self, state: RuntimeSessionState) -> Result<()> {
1236 self.control.set_persisted_state(state).await
1237 }
1238
1239 pub async fn branch_to_node(
1240 &self,
1241 target_leaf: Option<String>,
1242 ) -> Result<lash_core::SessionSnapshot> {
1243 self.control.branch_to_node(target_leaf).await
1244 }
1245
1246 pub async fn persist_current(&self) -> Result<RuntimeSessionState> {
1247 self.control.persist_current_state().await
1248 }
1249
1250 pub async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
1251 self.control.session_state_service().await
1252 }
1253
1254 pub async fn snapshot_execution(&self) -> Result<Option<Vec<u8>>> {
1255 self.control.snapshot_execution_state().await
1256 }
1257
1258 pub async fn restore_execution(&self, bytes: &[u8]) -> Result<()> {
1259 self.control.restore_execution_state(bytes).await
1260 }
1261
1262 pub async fn compact_context(
1263 &self,
1264 instructions: Option<String>,
1265 scoped_effect_controller: ScopedEffectController<'_>,
1266 ) -> Result<bool> {
1267 self.control
1268 .compact_context(instructions, scoped_effect_controller)
1269 .await
1270 }
1271}
1272
1273#[derive(Clone)]
1274pub struct PluginOperations {
1275 pub(crate) control: SessionAdmin,
1276}
1277
1278impl PluginOperations {
1279 pub async fn query<Op: lash_core::PluginQuery>(&self, args: Op::Args) -> Result<Op::Output> {
1280 let (_plugin_id, output) = self
1281 .control
1282 .query_plugin_raw(Op::NAME, encode_plugin_args::<Op>(args)?)
1283 .await?;
1284 decode_plugin_output::<Op>(output)
1285 }
1286
1287 pub async fn query_raw(
1288 &self,
1289 name: &str,
1290 args: serde_json::Value,
1291 ) -> Result<(String, serde_json::Value)> {
1292 self.control.query_plugin_raw(name, args).await
1293 }
1294
1295 pub async fn run_command<Op: lash_core::PluginCommand>(
1296 &self,
1297 args: Op::Args,
1298 ) -> Result<lash_core::PluginCommandReceipt<Op::Output>> {
1299 let receipt = self
1300 .control
1301 .run_plugin_command_raw(Op::NAME, encode_plugin_args::<Op>(args)?)
1302 .await?;
1303 Ok(lash_core::PluginCommandReceipt {
1304 output: decode_plugin_output::<Op>(receipt.output)?,
1305 events: receipt.events,
1306 pending_turn_inputs: receipt.pending_turn_inputs,
1307 })
1308 }
1309
1310 pub async fn run_command_raw(
1311 &self,
1312 name: &str,
1313 args: serde_json::Value,
1314 ) -> Result<lash_core::PluginCommandReceipt<serde_json::Value>> {
1315 self.control.run_plugin_command_raw(name, args).await
1316 }
1317
1318 pub async fn run_task<Op: lash_core::PluginTask>(
1319 &self,
1320 args: Op::Args,
1321 ) -> Result<lash_core::PluginTaskReceipt<Op::Output>> {
1322 self.run_task_with_cancel::<Op>(args, CancellationToken::new())
1323 .await
1324 }
1325
1326 pub async fn run_task_with_cancel<Op: lash_core::PluginTask>(
1327 &self,
1328 args: Op::Args,
1329 cancellation_token: CancellationToken,
1330 ) -> Result<lash_core::PluginTaskReceipt<Op::Output>> {
1331 let receipt = self
1332 .control
1333 .run_plugin_task_raw_with_cancel(
1334 Op::NAME,
1335 encode_plugin_args::<Op>(args)?,
1336 cancellation_token,
1337 )
1338 .await?;
1339 Ok(lash_core::PluginTaskReceipt {
1340 output: decode_plugin_output::<Op>(receipt.output)?,
1341 events: receipt.events,
1342 pending_turn_inputs: receipt.pending_turn_inputs,
1343 })
1344 }
1345
1346 pub async fn run_task_raw(
1347 &self,
1348 name: &str,
1349 args: serde_json::Value,
1350 ) -> Result<lash_core::PluginTaskReceipt<serde_json::Value>> {
1351 self.run_task_raw_with_cancel(name, args, CancellationToken::new())
1352 .await
1353 }
1354
1355 pub async fn run_task_raw_with_cancel(
1356 &self,
1357 name: &str,
1358 args: serde_json::Value,
1359 cancellation_token: CancellationToken,
1360 ) -> Result<lash_core::PluginTaskReceipt<serde_json::Value>> {
1361 self.control
1362 .run_plugin_task_raw_with_cancel(name, args, cancellation_token)
1363 .await
1364 }
1365}
1366
1367fn encode_plugin_args<Op: lash_core::PluginOperation>(args: Op::Args) -> Result<serde_json::Value> {
1368 serde_json::to_value(args).map_err(|err| {
1369 EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
1370 "invalid {} args: {err}",
1371 Op::NAME
1372 )))
1373 })
1374}
1375
1376fn decode_plugin_output<Op: lash_core::PluginOperation>(
1377 output: serde_json::Value,
1378) -> Result<Op::Output> {
1379 serde_json::from_value(output).map_err(|err| {
1380 EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
1381 "invalid {} output: {err}",
1382 Op::NAME
1383 )))
1384 })
1385}
1386
1387#[derive(Clone)]
1388pub struct ChildSessionAdmin {
1389 control: SessionAdmin,
1390}
1391
1392impl ChildSessionAdmin {
1393 pub async fn create_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
1394 self.control.create_child_session(request).await
1395 }
1396
1397 pub async fn close_session(&self, session_id: &str) -> Result<()> {
1398 self.control.close_child_session(session_id).await
1399 }
1400
1401 pub async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
1402 self.control.activate_managed_session(session_id).await
1403 }
1404}
1405
1406#[derive(Clone)]
1407pub struct InjectionAdmin {
1408 control: SessionAdmin,
1409}
1410
1411impl InjectionAdmin {
1412 pub async fn inject_turn_input(
1413 &self,
1414 turn_id: &str,
1415 id: Option<String>,
1416 message: PluginMessage,
1417 ) -> Result<()> {
1418 self.control.inject_turn_input(turn_id, id, message).await
1419 }
1420
1421 pub async fn inject_turn_inputs_for_turn(
1422 &self,
1423 turn_id: &str,
1424 messages: Vec<lash_core::InjectedTurnInput>,
1425 ) -> Result<()> {
1426 self.control
1427 .inject_turn_inputs_for_turn(turn_id, messages)
1428 .await
1429 }
1430}
1431
1432#[derive(Clone)]
1433pub struct ProtocolAdmin {
1434 control: SessionAdmin,
1435}
1436
1437impl ProtocolAdmin {
1438 pub async fn apply_session_extension(
1439 &self,
1440 extension: lash_core::ProtocolSessionExtensionHandle,
1441 ) -> Result<()> {
1442 self.control
1443 .apply_protocol_session_extension(extension)
1444 .await
1445 }
1446}