1pub use crate::session::SessionConfigPatch;
2use crate::support::*;
3pub use lash_core::{AcceptedInjectedTurnInput, PluginCommand, PluginQuery, PluginTask};
4
5#[derive(Clone)]
6pub struct Completions {
7 pub(crate) core: LashCore,
8}
9
10impl Completions {
11 pub async fn resolve(
12 &self,
13 key: lash_core::AwaitEventKey,
14 resolution: lash_core::Resolution,
15 ) -> Result<lash_core::ResolveOutcome> {
16 self.core
17 .env
18 .core
19 .control
20 .effect_host
21 .resolve_await_event(&key, resolution)
22 .await
23 .map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))
24 }
25}
26
27#[derive(Clone)]
28pub struct CoreTriggerAdmin {
29 pub(crate) core: LashCore,
30}
31
32impl CoreTriggerAdmin {
33 pub async fn emit(
34 &self,
35 request: lash_core::TriggerOccurrenceRequest,
36 scoped_effect_controller: ScopedEffectController<'_>,
37 ) -> Result<lash_core::TriggerEmitReport> {
38 let store = self.core.env.trigger_store.as_ref().ok_or_else(|| {
39 EmbedError::Plugin(lash_core::PluginError::Session(
40 "trigger store is unavailable in this runtime".to_string(),
41 ))
42 })?;
43 let drivers = self.core.work_driver.drivers().await;
44 let router = lash_core::TriggerRouter::new(
45 Arc::clone(store),
46 self.core.env.process_registry.clone(),
47 drivers.process,
48 );
49 router
50 .emit(request, scoped_effect_controller.controller())
51 .await
52 .map_err(Into::into)
53 }
54
55 pub async fn subscriptions(
56 &self,
57 filter: lash_core::TriggerSubscriptionFilter,
58 ) -> Result<Vec<lash_core::TriggerRegistration>> {
59 let store = self.core.env.trigger_store.as_ref().ok_or_else(|| {
60 EmbedError::Plugin(lash_core::PluginError::Session(
61 "trigger store is unavailable in this runtime".to_string(),
62 ))
63 })?;
64 let records = store.list_subscriptions(filter).await?;
65 Ok(records
66 .iter()
67 .map(lash_core::TriggerRegistration::from)
68 .collect())
69 }
70}
71
72#[derive(Clone)]
73pub struct Processes {
74 pub(crate) core: LashCore,
75}
76
77impl Processes {
78 fn registry(&self) -> Result<Arc<dyn lash_core::ProcessRegistry>> {
79 self.core
80 .env
81 .process_registry
82 .as_ref()
83 .cloned()
84 .ok_or_else(|| {
85 EmbedError::Plugin(lash_core::PluginError::Session(
86 "process registry is unavailable in this runtime".to_string(),
87 ))
88 })
89 }
90
91 fn make_observer(&self) -> Result<lash_core::ProcessWorkObserver> {
92 Ok(lash_core::ProcessWorkObserver::new(self.registry()?))
93 }
94
95 fn process_invocation(command: &lash_core::ProcessCommand) -> lash_core::RuntimeInvocation {
96 let effect_id = command.effect_id();
97 lash_core::RuntimeInvocation::effect(
98 lash_core::runtime::RuntimeScope::new("runtime"),
99 effect_id.clone(),
100 lash_core::RuntimeEffectKind::Process,
101 effect_id,
102 )
103 }
104
105 async fn run_command(
106 &self,
107 command: lash_core::ProcessCommand,
108 scoped_effect_controller: ScopedEffectController<'_>,
109 ) -> Result<lash_core::ProcessEffectOutcome> {
110 let registry = self.registry()?;
111 let invocation = Self::process_invocation(&command);
112 let outcome = scoped_effect_controller
113 .controller()
114 .execute_effect(
115 lash_core::RuntimeEffectEnvelope::new(
116 invocation,
117 lash_core::RuntimeEffectCommand::process(command),
118 ),
119 lash_core::RuntimeEffectLocalExecutor::processes(registry),
120 )
121 .await
122 .map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))?;
123 match outcome {
124 lash_core::RuntimeEffectOutcome::Process { result } => Ok(result),
125 _ => Err(EmbedError::Plugin(lash_core::PluginError::Session(
126 "process effect returned non-process outcome".to_string(),
127 ))),
128 }
129 }
130
131 pub async fn start(
132 &self,
133 request: lash_core::ProcessStartRequest,
134 scoped_effect_controller: ScopedEffectController<'_>,
135 ) -> Result<lash_core::ProcessRecord> {
136 let env_ref = match request.env_spec.as_ref() {
137 Some(env_spec) => Some(
138 lash_core::runtime::persist_process_execution_env(
139 self.core.env.core.durability.process_env_store.as_ref(),
140 env_spec,
141 )
142 .await?,
143 ),
144 None => None,
145 };
146 let grant = request.grant.clone();
147 let registration = request.into_registration(env_ref);
148 let command = lash_core::ProcessCommand::Start {
149 registration,
150 grant,
151 execution_context: Box::new(lash_core::ProcessExecutionContext::default()),
152 };
153 let outcome = self
154 .run_command(command, scoped_effect_controller.clone())
155 .await?;
156 let lash_core::ProcessEffectOutcome::Start { record } = outcome else {
157 return Err(EmbedError::Plugin(lash_core::PluginError::Session(
158 "process start returned the wrong outcome".to_string(),
159 )));
160 };
161 if let Some(driver) = self.core.work_driver.drivers().await.process {
162 driver.claim_and_run_pending("admin_process_start").await?;
163 }
164 Ok(record)
165 }
166
167 pub async fn list(
168 &self,
169 filter: &lash_core::ProcessListFilter,
170 ) -> Result<Vec<lash_core::ObservedProcess>> {
171 self.make_observer()?.list(filter).await.map_err(Into::into)
172 }
173
174 pub async fn get(&self, process_id: &str) -> Result<Option<lash_core::ObservedProcess>> {
175 Ok(self.make_observer()?.process(process_id).await)
176 }
177
178 pub async fn events(
179 &self,
180 process_id: &str,
181 after_sequence: u64,
182 ) -> Result<Vec<lash_core::ObservedProcessEvent>> {
183 self.make_observer()?
184 .events_after(process_id, after_sequence)
185 .await
186 .map_err(Into::into)
187 }
188
189 pub async fn await_output(&self, process_id: &str) -> Result<lash_core::ProcessAwaitOutput> {
190 self.registry()?
191 .await_process(process_id)
192 .await
193 .map_err(Into::into)
194 }
195
196 pub async fn cancel(
197 &self,
198 process_id: &str,
199 scoped_effect_controller: ScopedEffectController<'_>,
200 ) -> Result<lash_core::ProcessCancelSummary> {
201 let command = lash_core::ProcessCommand::Cancel {
202 process_id: process_id.to_string(),
203 reason: Some("requested by host".to_string()),
204 };
205 let outcome = self
206 .run_command(command, scoped_effect_controller.clone())
207 .await?;
208 let lash_core::ProcessEffectOutcome::Cancel { record } = outcome else {
209 return Err(EmbedError::Plugin(lash_core::PluginError::Session(
210 "process cancel returned the wrong outcome".to_string(),
211 )));
212 };
213 Ok(lash_core::ProcessCancelSummary::from_record(record))
214 }
215
216 pub async fn signal(
217 &self,
218 process_id: &str,
219 signal_name: impl Into<String>,
220 signal_id: impl Into<String>,
221 request: lash_core::ProcessEventAppendRequest,
222 scoped_effect_controller: ScopedEffectController<'_>,
223 ) -> Result<lash_core::ProcessEvent> {
224 let signal_name = signal_name.into();
225 let event_type = request.event_type.clone();
226 let payload = request.payload.clone();
227 let command = lash_core::ProcessCommand::Signal {
228 process_id: process_id.to_string(),
229 signal_name: signal_name.clone(),
230 signal_id: signal_id.into(),
231 request,
232 };
233 let outcome = self
234 .run_command(command, scoped_effect_controller.clone())
235 .await?;
236 let lash_core::ProcessEffectOutcome::Signal { event } = outcome else {
237 return Err(EmbedError::Plugin(lash_core::PluginError::Session(
238 "process signal returned the wrong outcome".to_string(),
239 )));
240 };
241 let registry = self.registry()?;
242 let waiting_ordinal =
243 registry
244 .get_process(process_id)
245 .await
246 .and_then(|record| match record.wait {
247 Some(lash_core::WaitState {
248 kind:
249 lash_core::WaitKind::Signal {
250 name,
251 event_type: wait_event_type,
252 ordinal,
253 ..
254 },
255 ..
256 }) if name == signal_name && wait_event_type == event_type => Some(ordinal),
257 _ => None,
258 });
259 let ordinal = match waiting_ordinal {
260 Some(ordinal) => ordinal,
261 None => {
262 registry
263 .count_events_through(process_id, &event_type, event.sequence)
264 .await?
265 }
266 };
267 if ordinal > 0 {
268 let key = scoped_effect_controller
269 .controller()
270 .await_event_key(
271 &lash_core::ExecutionScope::process(process_id),
272 lash_core::AwaitEventWaitIdentity::process_signal(
273 process_id,
274 &signal_name,
275 ordinal,
276 ),
277 )
278 .await
279 .map_err(|err| {
280 EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
281 })?;
282 let _ = scoped_effect_controller
283 .controller()
284 .resolve_await_event(&key, lash_core::Resolution::Ok(payload))
285 .await
286 .map_err(|err| {
287 EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
288 })?;
289 }
290 Ok(event)
291 }
292
293 pub async fn session_snapshot(
294 &self,
295 session_id: impl Into<String>,
296 ) -> Result<lash_core::ProcessWorkSnapshot> {
297 self.make_observer()?
298 .snapshot_for_session(session_id)
299 .await
300 .map_err(Into::into)
301 }
302
303 pub fn observer(&self) -> Result<lash_core::ProcessWorkObserver> {
304 self.make_observer()
305 }
306}
307
308#[derive(Clone)]
309pub struct SessionAdmin {
310 pub(crate) runtime: RuntimeHandle,
311}
312
313impl SessionAdmin {
314 pub fn config(&self) -> SessionConfigAdmin {
315 SessionConfigAdmin {
316 control: self.clone(),
317 }
318 }
319
320 pub fn tools(&self) -> ToolAdmin {
321 ToolAdmin {
322 control: self.clone(),
323 }
324 }
325
326 pub fn commands(&self) -> SessionCommandAdmin {
327 SessionCommandAdmin {
328 control: self.clone(),
329 }
330 }
331
332 pub fn triggers(&self) -> SessionTriggerAdmin {
333 SessionTriggerAdmin {
334 control: self.clone(),
335 }
336 }
337
338 pub fn state(&self) -> SessionStateAdmin {
339 SessionStateAdmin {
340 control: self.clone(),
341 }
342 }
343
344 pub fn children(&self) -> ChildSessionAdmin {
345 ChildSessionAdmin {
346 control: self.clone(),
347 }
348 }
349
350 pub fn injection(&self) -> InjectionAdmin {
351 InjectionAdmin {
352 control: self.clone(),
353 }
354 }
355
356 pub fn protocol(&self) -> ProtocolAdmin {
357 ProtocolAdmin {
358 control: self.clone(),
359 }
360 }
361
362 pub fn processes(&self) -> SessionProcessAdmin {
363 SessionProcessAdmin {
364 control: self.clone(),
365 }
366 }
367
368 async fn with_writer<F, T>(&self, f: F) -> T
373 where
374 F: AsyncFnOnce(&mut LashRuntime) -> T,
375 {
376 let writer = self.runtime.writer();
377 let mut runtime = writer.lock().await;
378 let value = f(&mut runtime).await;
379 self.runtime.publish_from(&runtime);
380 value
381 }
382
383 async fn update_config(&self, patch: SessionConfigPatch) -> Result<()> {
384 self.update_session_config(patch.provider, patch.model, patch.prompt)
385 .await?;
386 Ok(())
387 }
388
389 async fn update_session_config(
390 &self,
391 provider: Option<ProviderHandle>,
392 model: Option<lash_core::ModelSpec>,
393 prompt: Option<PromptLayer>,
394 ) -> Result<()> {
395 self.with_writer(async |runtime: &mut LashRuntime| {
396 runtime.update_session_config(provider, model, prompt).await;
397 })
398 .await;
399 Ok(())
400 }
401
402 async fn export_state(&self) -> lash_core::SessionSnapshot {
403 self.runtime.observe().read_view.to_snapshot()
404 }
405
406 async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
407 self.with_writer(async |runtime: &mut LashRuntime| {
408 runtime
409 .append_session_nodes(lash_core::AppendSessionNodesRequest {
410 nodes: messages
411 .into_iter()
412 .map(lash_core::SessionAppendNode::message)
413 .collect(),
414 requires_ancestor_node_id: None,
415 })
416 .await
417 .map(|_| ())
418 .map_err(Into::into)
419 })
420 .await
421 }
422
423 async fn append_plugin_body(
424 &self,
425 plugin_type: impl Into<String>,
426 body: serde_json::Value,
427 ) -> Result<()> {
428 self.with_writer(async |runtime: &mut LashRuntime| {
429 runtime
430 .append_session_nodes(lash_core::AppendSessionNodesRequest {
431 nodes: vec![lash_core::SessionAppendNode::plugin(plugin_type, body)],
432 requires_ancestor_node_id: None,
433 })
434 .await
435 .map(|_| ())
436 .map_err(Into::into)
437 })
438 .await
439 }
440
441 async fn set_persisted_state(&self, state: RuntimeSessionState) -> Result<()> {
442 self.with_writer(async |runtime: &mut LashRuntime| {
443 runtime.set_persisted_state(state).map_err(Into::into)
444 })
445 .await
446 }
447
448 async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
449 self.with_writer(async |runtime: &mut LashRuntime| {
450 runtime.set_prompt_template(template).await;
451 })
452 .await;
453 Ok(())
454 }
455
456 async fn clear_prompt_template(&self) -> Result<()> {
457 self.with_writer(async |runtime: &mut LashRuntime| {
458 runtime.clear_prompt_template().await;
459 })
460 .await;
461 Ok(())
462 }
463
464 async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
465 self.with_writer(async |runtime: &mut LashRuntime| {
466 runtime.add_prompt_contribution(contribution).await;
467 })
468 .await;
469 Ok(())
470 }
471
472 async fn replace_prompt_slot(
473 &self,
474 slot: PromptSlot,
475 contributions: impl IntoIterator<Item = PromptContribution>,
476 ) -> Result<()> {
477 self.with_writer(async |runtime: &mut LashRuntime| {
478 runtime.replace_prompt_slot(slot, contributions).await;
479 })
480 .await;
481 Ok(())
482 }
483
484 async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
485 self.with_writer(async |runtime: &mut LashRuntime| {
486 runtime.clear_prompt_slot(slot).await;
487 })
488 .await;
489 Ok(())
490 }
491
492 async fn apply_protocol_session_extension(
493 &self,
494 extension: lash_core::ProtocolSessionExtensionHandle,
495 ) -> Result<()> {
496 self.with_writer(async |runtime: &mut LashRuntime| {
497 runtime
498 .apply_protocol_session_extension(extension)
499 .await
500 .map_err(Into::into)
501 })
502 .await
503 }
504
505 async fn branch_to_node(
506 &self,
507 target_leaf: Option<String>,
508 ) -> Result<lash_core::SessionSnapshot> {
509 self.with_writer(async |runtime: &mut LashRuntime| {
510 runtime
511 .branch_to_node(target_leaf)
512 .await
513 .map_err(Into::into)
514 })
515 .await
516 }
517
518 async fn await_background_work(&self) -> Result<()> {
519 self.with_writer(async |runtime: &mut LashRuntime| {
520 runtime.await_background_work().await.map_err(Into::into)
521 })
522 .await
523 }
524
525 async fn refresh_tool_catalog(&self) -> Result<()> {
526 self.with_writer(async |runtime: &mut LashRuntime| {
527 runtime
528 .refresh_session_tool_catalog()
529 .await
530 .map_err(Into::into)
531 })
532 .await
533 }
534
535 async fn submit_session_command(
536 &self,
537 command: lash_core::SessionCommand,
538 idempotency_key: impl Into<String>,
539 ) -> Result<lash_core::SessionCommandReceipt> {
540 let idempotency_key = idempotency_key.into();
541 self.with_writer(async |runtime: &mut LashRuntime| {
542 runtime
543 .submit_session_command(command, idempotency_key)
544 .await
545 .map_err(Into::into)
546 })
547 .await
548 }
549
550 async fn list_trigger_registrations(&self) -> Result<Vec<lash_core::TriggerRegistration>> {
551 self.with_writer(async |runtime: &mut LashRuntime| {
552 runtime
553 .list_trigger_registrations()
554 .await
555 .map_err(Into::into)
556 })
557 .await
558 }
559
560 async fn trigger_registrations_by_source_type(
561 &self,
562 source_type: impl Into<lash_core::TriggerEventType>,
563 ) -> Result<Vec<lash_core::TriggerRegistration>> {
564 self.with_writer(async |runtime: &mut LashRuntime| {
565 runtime
566 .trigger_registrations_by_source_type(source_type)
567 .await
568 .map_err(Into::into)
569 })
570 .await
571 }
572
573 async fn query_plugin_raw(
574 &self,
575 name: &str,
576 args: serde_json::Value,
577 ) -> Result<(String, serde_json::Value)> {
578 let observation = self.runtime.observe();
579 let session_id = observation.session_id().to_string();
580 observation
581 .query_plugin(name, args, Some(session_id))
582 .await
583 .map_err(Into::into)
584 }
585
586 async fn run_plugin_command_raw(
587 &self,
588 name: &str,
589 args: serde_json::Value,
590 ) -> Result<lash_core::PluginCommandReceipt<serde_json::Value>> {
591 let session_id = self.runtime.observe().session_id().to_string();
592 let writer = self.runtime.writer();
593 let mut runtime = writer.lock().await;
594 let receipt = runtime
595 .run_plugin_command(name, args, Some(session_id))
596 .await?;
597 self.record_plugin_operation_observations(&receipt.events, &receipt.queued_batches);
598 self.runtime.publish_from(&runtime);
599 Ok(receipt)
600 }
601
602 async fn run_plugin_task_raw_with_cancel(
603 &self,
604 name: &str,
605 args: serde_json::Value,
606 cancellation_token: CancellationToken,
607 ) -> Result<lash_core::PluginTaskReceipt<serde_json::Value>> {
608 let session_id = self.runtime.observe().session_id().to_string();
609 let writer = self.runtime.writer();
610 let mut runtime = writer.lock().await;
611 let scope_id = format!(
612 "{session_id}:plugin_task:{name}:{}",
613 lash_core::TurnActivityId::fresh().0
614 );
615 let scoped_effect_controller = runtime
616 .effect_host()
617 .scoped_static(lash_core::ExecutionScope::runtime_operation(scope_id))
618 .map_err(EmbedError::Runtime)?
619 .ok_or_else(|| {
620 EmbedError::Plugin(lash_core::PluginError::Session(
621 "plugin task execution requires an effect host that can create a static runtime-operation scope".to_string(),
622 ))
623 })?;
624 let receipt = runtime
625 .run_plugin_task(
626 name,
627 args,
628 Some(session_id),
629 scoped_effect_controller,
630 cancellation_token,
631 )
632 .await?;
633 self.record_plugin_operation_observations(&receipt.events, &receipt.queued_batches);
634 self.runtime.publish_from(&runtime);
635 Ok(receipt)
636 }
637
638 fn record_plugin_operation_observations(
639 &self,
640 events: &[lash_core::PluginOwned<lash_core::PluginRuntimeEvent>],
641 queued_batches: &[lash_core::runtime::QueuedWorkBatch],
642 ) {
643 for owned in events {
644 self.runtime
645 .record_turn_activity(lash_core::TurnActivity::independent(
646 lash_core::TurnEvent::PluginRuntime {
647 plugin_id: owned.plugin_id.clone(),
648 event: owned.value.clone(),
649 },
650 ));
651 }
652 if !queued_batches.is_empty() {
653 self.runtime.record_queue_changed(
654 lash_core::SessionQueueEventKind::Enqueued,
655 queued_batches
656 .iter()
657 .map(|batch| batch.batch_id.clone())
658 .collect(),
659 );
660 }
661 }
662
663 async fn compact_context(
664 &self,
665 instructions: Option<String>,
666 scoped_effect_controller: ScopedEffectController<'_>,
667 ) -> Result<bool> {
668 self.with_writer(async |runtime: &mut LashRuntime| {
669 runtime
670 .compact_context(instructions, scoped_effect_controller)
671 .await
672 .map_err(Into::into)
673 })
674 .await
675 }
676
677 async fn persist_current_state(&self) -> Result<RuntimeSessionState> {
678 self.with_writer(async |runtime: &mut LashRuntime| {
679 runtime.await_background_work().await?;
680 Ok(runtime.export_persisted_state())
681 })
682 .await
683 }
684
685 async fn list_process_handles(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
686 Ok(self.runtime.observe().list_process_handles().await)
687 }
688
689 async fn list_all_process_handles(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
690 Ok(self.runtime.observe().list_all_process_handles().await)
691 }
692
693 async fn start_process(
694 &self,
695 request: lash_core::ProcessStartRequest,
696 scoped_effect_controller: ScopedEffectController<'_>,
697 ) -> Result<lash_core::ProcessHandleSummary> {
698 let writer = self.runtime.writer();
699 let runtime = writer.lock().await;
700 let session_id = runtime.session_id().to_string();
701 let processes = runtime.process_service()?;
702 let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
703 let summary = processes
704 .start_from_request(&session_id, request, scope)
705 .await
706 .map_err(EmbedError::Plugin)?;
707 self.runtime.record_process_changed(
708 SessionProcessEventKind::Started,
709 vec![summary.process_id.clone()],
710 );
711 Ok(summary)
712 }
713
714 async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
715 self.runtime
716 .writer()
717 .lock()
718 .await
719 .session_state_service()
720 .map_err(Into::into)
721 }
722
723 async fn cancel_process(
724 &self,
725 process_id: &str,
726 scoped_effect_controller: ScopedEffectController<'_>,
727 ) -> Result<lash_core::ProcessCancelSummary> {
728 let writer = self.runtime.writer();
729 let runtime = writer.lock().await;
730 let session_id = runtime.session_id().to_string();
731 let processes = runtime.process_service()?;
732 let cancel_ability = runtime.process_cancel_ability();
733 let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
734 let summary = cancel_ability
735 .cancel_summary(
736 processes.as_ref(),
737 lash_core::ProcessCancelRequest::new(
738 &session_id,
739 process_id,
740 scope,
741 lash_core::ProcessCancelSource::HostApi,
742 )
743 .with_reason("requested by host API"),
744 )
745 .await
746 .map_err(EmbedError::Plugin)?;
747 self.runtime.record_process_changed(
748 SessionProcessEventKind::Cancelled,
749 vec![summary.process_id.clone()],
750 );
751 Ok(summary)
752 }
753
754 async fn cancel_visible_processes(
755 &self,
756 scoped_effect_controller: ScopedEffectController<'_>,
757 ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
758 let writer = self.runtime.writer();
759 let runtime = writer.lock().await;
760 let session_id = runtime.session_id().to_string();
761 let processes = runtime.process_service()?;
762 let cancel_ability = runtime.process_cancel_ability();
763 let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
764 let summaries = cancel_ability
765 .cancel_all_visible(
766 processes.as_ref(),
767 lash_core::ProcessCancelAllRequest::new(
768 &session_id,
769 scope,
770 lash_core::ProcessCancelSource::HostApi,
771 )
772 .with_reason("requested by host API"),
773 )
774 .await
775 .map_err(EmbedError::Plugin)?;
776 self.runtime.record_process_changed(
777 SessionProcessEventKind::Cancelled,
778 summaries
779 .iter()
780 .map(|summary| summary.process_id.clone())
781 .collect(),
782 );
783 Ok(summaries)
784 }
785
786 async fn snapshot_execution_state(&self) -> Result<Option<Vec<u8>>> {
787 self.with_writer(async |runtime: &mut LashRuntime| {
788 runtime.snapshot_execution_state().await.map_err(Into::into)
789 })
790 .await
791 }
792
793 async fn restore_execution_state(&self, bytes: &[u8]) -> Result<()> {
794 self.with_writer(async |runtime: &mut LashRuntime| {
795 runtime
796 .restore_execution_state(bytes)
797 .await
798 .map_err(Into::into)
799 })
800 .await
801 }
802
803 async fn tool_state(&self) -> Result<ToolState> {
804 self.runtime.observe().tool_state.clone().ok_or_else(|| {
805 EmbedError::Session(SessionError::Protocol(
806 "runtime session not available".to_string(),
807 ))
808 })
809 }
810
811 async fn apply_tool_state(&self, state: ToolState) -> Result<u64> {
812 self.with_writer(async |runtime: &mut LashRuntime| {
813 runtime
814 .apply_tool_state(state)
815 .await
816 .map_err(EmbedError::from)
817 })
818 .await
819 }
820
821 async fn restore_tool_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
822 self.with_writer(async |runtime: &mut LashRuntime| {
823 runtime
824 .restore_tool_state(state)
825 .await
826 .map_err(EmbedError::from)
827 })
828 .await
829 }
830
831 async fn set_tool_availability(
832 &self,
833 tool_id: lash_core::ToolId,
834 availability: ToolAvailability,
835 ) -> Result<u64> {
836 self.set_tool_availability_many(&[(tool_id, availability)])
837 .await
838 }
839
840 async fn set_tool_availability_many(
841 &self,
842 updates: &[(lash_core::ToolId, ToolAvailability)],
843 ) -> Result<u64> {
844 let mut state = self.tool_state().await?;
845 for (tool_id, availability) in updates {
846 state
847 .set_availability(tool_id, Some(*availability))
848 .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
849 }
850 self.apply_tool_state(state).await
851 }
852
853 async fn clear_tool_availability_override(&self, tool_id: lash_core::ToolId) -> Result<u64> {
854 let mut state = self.tool_state().await?;
855 state
856 .set_availability(&tool_id, None)
857 .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
858 self.apply_tool_state(state).await
859 }
860
861 async fn active_tool_manifests(&self) -> Result<Vec<ToolManifest>> {
862 Ok(self.tool_state().await?.tool_manifests())
863 }
864
865 async fn add_tool_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
866 let tool_registry = self.tool_registry().await?;
867 let handle = tool_registry
868 .add_tool_provider(provider)
869 .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
870 self.refresh_tool_catalog().await?;
871 Ok(handle)
872 }
873
874 async fn remove_tool_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
875 let tool_registry = self.tool_registry().await?;
876 let generation = tool_registry
877 .remove_source(handle)
878 .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
879 self.refresh_tool_catalog().await?;
880 Ok(generation)
881 }
882
883 async fn create_child_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
884 let writer = self.runtime.writer();
885 let runtime = writer.lock().await;
886 let lifecycle = runtime.session_lifecycle_service()?;
887 lifecycle.create_session(request).await.map_err(Into::into)
888 }
889
890 async fn close_child_session(&self, session_id: &str) -> Result<()> {
891 let writer = self.runtime.writer();
892 let runtime = writer.lock().await;
893 let lifecycle = runtime.session_lifecycle_service()?;
894 lifecycle
895 .close_session(session_id)
896 .await
897 .map_err(Into::into)
898 }
899
900 async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
901 self.with_writer(async |runtime: &mut LashRuntime| {
902 runtime
903 .activate_managed_session(session_id)
904 .await
905 .map_err(Into::into)
906 })
907 .await
908 }
909
910 async fn inject_turn_input(&self, id: Option<String>, message: PluginMessage) -> Result<()> {
911 self.inject_turn_inputs(vec![lash_core::InjectedTurnInput { id, message }])
912 .await
913 }
914
915 async fn inject_turn_inputs(&self, messages: Vec<lash_core::InjectedTurnInput>) -> Result<()> {
916 for input in messages {
917 let source_key = input.id.map(|id| format!("injection:{id}"));
918 let turn_input = turn_input_from_plugin_message(input.message);
919 self.runtime
920 .enqueue_turn_input(
921 turn_input,
922 lash_core::DeliveryPolicy::EarliestSafeBoundary,
923 lash_core::SlotPolicy::Join,
924 source_key,
925 )
926 .await
927 .map(|_| ())
928 .map_err(EmbedError::Runtime)?;
929 }
930 Ok(())
931 }
932
933 async fn tool_registry(&self) -> Result<Arc<lash_core::ToolRegistry>> {
934 self.runtime
935 .writer()
936 .lock()
937 .await
938 .plugin_session()
939 .map(|session| session.tool_registry())
940 .ok_or_else(|| {
941 EmbedError::Session(SessionError::Protocol(
942 "tool registry is unavailable in this runtime session".to_string(),
943 ))
944 })
945 }
946}
947
948fn turn_input_from_plugin_message(message: PluginMessage) -> TurnInput {
949 let mut input = TurnInput::empty();
950 if !message.content.is_empty() {
951 input.items.push(InputItem::Text {
952 text: message.content,
953 });
954 }
955 for (index, bytes) in message.images.into_iter().enumerate() {
956 let id = format!("injected-image-{index}");
957 input.items.push(InputItem::ImageRef { id: id.clone() });
958 input.image_blobs.insert(id, bytes);
959 }
960 input
961}
962
963#[derive(Clone)]
964pub struct SessionConfigAdmin {
965 control: SessionAdmin,
966}
967
968impl SessionConfigAdmin {
969 pub async fn update(&self, patch: SessionConfigPatch) -> Result<()> {
970 self.control.update_config(patch).await
971 }
972
973 pub async fn update_session_config(
974 &self,
975 provider: Option<ProviderHandle>,
976 model: Option<lash_core::ModelSpec>,
977 prompt: Option<PromptLayer>,
978 ) -> Result<()> {
979 self.control
980 .update_session_config(provider, model, prompt)
981 .await
982 }
983
984 pub async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
985 self.control.set_prompt_template(template).await
986 }
987
988 pub async fn clear_prompt_template(&self) -> Result<()> {
989 self.control.clear_prompt_template().await
990 }
991
992 pub async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
993 self.control.add_prompt_contribution(contribution).await
994 }
995
996 pub async fn replace_prompt_slot(
997 &self,
998 slot: PromptSlot,
999 contributions: impl IntoIterator<Item = PromptContribution>,
1000 ) -> Result<()> {
1001 self.control.replace_prompt_slot(slot, contributions).await
1002 }
1003
1004 pub async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
1005 self.control.clear_prompt_slot(slot).await
1006 }
1007}
1008
1009#[derive(Clone)]
1010pub struct ToolAdmin {
1011 control: SessionAdmin,
1012}
1013
1014impl ToolAdmin {
1015 pub(crate) fn new(control: SessionAdmin) -> Self {
1016 Self { control }
1017 }
1018}
1019
1020impl ToolAdmin {
1021 pub async fn state(&self) -> Result<ToolState> {
1022 self.control.tool_state().await
1023 }
1024
1025 pub fn advanced(&self) -> AdvancedToolAdmin {
1026 AdvancedToolAdmin {
1027 control: self.control.clone(),
1028 }
1029 }
1030
1031 pub async fn set_availability(
1032 &self,
1033 tool_id: impl Into<lash_core::ToolId>,
1034 availability: ToolAvailability,
1035 ) -> Result<u64> {
1036 self.control
1037 .set_tool_availability(tool_id.into(), availability)
1038 .await
1039 }
1040
1041 pub async fn set_availability_many(
1042 &self,
1043 updates: &[(lash_core::ToolId, ToolAvailability)],
1044 ) -> Result<u64> {
1045 self.control.set_tool_availability_many(updates).await
1046 }
1047
1048 pub async fn clear_availability_override(
1049 &self,
1050 tool_id: impl Into<lash_core::ToolId>,
1051 ) -> Result<u64> {
1052 self.control
1053 .clear_tool_availability_override(tool_id.into())
1054 .await
1055 }
1056
1057 pub async fn active_manifests(&self) -> Result<Vec<ToolManifest>> {
1058 self.control.active_tool_manifests().await
1059 }
1060
1061 pub async fn add_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
1062 self.control.add_tool_provider(provider).await
1063 }
1064
1065 pub async fn remove_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
1066 self.control.remove_tool_source(handle).await
1067 }
1068}
1069
1070#[derive(Clone)]
1071pub struct AdvancedToolAdmin {
1072 control: SessionAdmin,
1073}
1074
1075impl AdvancedToolAdmin {
1076 pub async fn apply_state(&self, state: ToolState) -> Result<u64> {
1082 self.control.apply_tool_state(state).await
1083 }
1084
1085 pub async fn restore_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
1098 self.control.restore_tool_state(state).await
1099 }
1100}
1101
1102#[derive(Clone)]
1103pub struct SessionCommandAdmin {
1104 control: SessionAdmin,
1105}
1106
1107impl SessionCommandAdmin {
1108 pub async fn refresh_tool_catalog(
1113 &self,
1114 reason: impl Into<String>,
1115 idempotency_key: impl Into<String>,
1116 ) -> Result<lash_core::SessionCommandReceipt> {
1117 self.control
1118 .submit_session_command(
1119 lash_core::SessionCommand::RefreshToolCatalog {
1120 reason: reason.into(),
1121 },
1122 idempotency_key,
1123 )
1124 .await
1125 }
1126
1127 pub async fn reset(
1128 &self,
1129 reason: impl Into<String>,
1130 idempotency_key: impl Into<String>,
1131 ) -> Result<lash_core::SessionCommandReceipt> {
1132 self.control
1133 .submit_session_command(
1134 lash_core::SessionCommand::ResetSession {
1135 reason: reason.into(),
1136 },
1137 idempotency_key,
1138 )
1139 .await
1140 }
1141}
1142
1143#[derive(Clone)]
1145pub struct SessionTriggerAdmin {
1146 control: SessionAdmin,
1147}
1148
1149impl SessionTriggerAdmin {
1150 pub async fn list_all(&self) -> Result<Vec<lash_core::TriggerRegistration>> {
1156 self.control.list_trigger_registrations().await
1157 }
1158
1159 pub async fn by_source_type(
1164 &self,
1165 source_type: impl Into<lash_core::TriggerEventType>,
1166 ) -> Result<Vec<lash_core::TriggerRegistration>> {
1167 self.control
1168 .trigger_registrations_by_source_type(source_type)
1169 .await
1170 }
1171}
1172
1173#[derive(Clone)]
1174pub struct SessionProcessAdmin {
1175 control: SessionAdmin,
1176}
1177
1178impl SessionProcessAdmin {
1179 pub(crate) fn new(control: SessionAdmin) -> Self {
1180 Self { control }
1181 }
1182
1183 pub async fn start(
1184 &self,
1185 request: lash_core::ProcessStartRequest,
1186 scoped_effect_controller: ScopedEffectController<'_>,
1187 ) -> Result<lash_core::ProcessHandleSummary> {
1188 self.control
1189 .start_process(request, scoped_effect_controller)
1190 .await
1191 }
1192
1193 pub async fn list(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
1194 self.control.list_process_handles().await
1195 }
1196
1197 pub async fn list_all(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
1198 self.control.list_all_process_handles().await
1199 }
1200
1201 pub async fn await_all(&self) -> Result<()> {
1202 self.control.await_background_work().await
1203 }
1204
1205 pub async fn cancel(
1206 &self,
1207 process_id: &str,
1208 scoped_effect_controller: ScopedEffectController<'_>,
1209 ) -> Result<lash_core::ProcessCancelSummary> {
1210 self.control
1211 .cancel_process(process_id, scoped_effect_controller)
1212 .await
1213 }
1214
1215 pub async fn cancel_all(
1216 &self,
1217 scoped_effect_controller: ScopedEffectController<'_>,
1218 ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
1219 self.control
1220 .cancel_visible_processes(scoped_effect_controller)
1221 .await
1222 }
1223}
1224
1225#[derive(Clone)]
1226pub struct SessionStateAdmin {
1227 control: SessionAdmin,
1228}
1229
1230impl SessionStateAdmin {
1231 pub async fn export(&self) -> lash_core::SessionSnapshot {
1232 self.control.export_state().await
1233 }
1234
1235 pub async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
1236 self.control.append_messages(messages).await
1237 }
1238
1239 pub async fn append_plugin_body(
1240 &self,
1241 plugin_type: impl Into<String>,
1242 body: serde_json::Value,
1243 ) -> Result<()> {
1244 self.control.append_plugin_body(plugin_type, body).await
1245 }
1246
1247 pub async fn set_persisted(&self, state: RuntimeSessionState) -> Result<()> {
1248 self.control.set_persisted_state(state).await
1249 }
1250
1251 pub async fn branch_to_node(
1252 &self,
1253 target_leaf: Option<String>,
1254 ) -> Result<lash_core::SessionSnapshot> {
1255 self.control.branch_to_node(target_leaf).await
1256 }
1257
1258 pub async fn persist_current(&self) -> Result<RuntimeSessionState> {
1259 self.control.persist_current_state().await
1260 }
1261
1262 pub async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
1263 self.control.session_state_service().await
1264 }
1265
1266 pub async fn snapshot_execution(&self) -> Result<Option<Vec<u8>>> {
1267 self.control.snapshot_execution_state().await
1268 }
1269
1270 pub async fn restore_execution(&self, bytes: &[u8]) -> Result<()> {
1271 self.control.restore_execution_state(bytes).await
1272 }
1273
1274 pub async fn compact_context(
1275 &self,
1276 instructions: Option<String>,
1277 scoped_effect_controller: ScopedEffectController<'_>,
1278 ) -> Result<bool> {
1279 self.control
1280 .compact_context(instructions, scoped_effect_controller)
1281 .await
1282 }
1283}
1284
1285#[derive(Clone)]
1286pub struct PluginOperations {
1287 pub(crate) control: SessionAdmin,
1288}
1289
1290impl PluginOperations {
1291 pub async fn query<Op: lash_core::PluginQuery>(&self, args: Op::Args) -> Result<Op::Output> {
1292 let (_plugin_id, output) = self
1293 .control
1294 .query_plugin_raw(Op::NAME, encode_plugin_args::<Op>(args)?)
1295 .await?;
1296 decode_plugin_output::<Op>(output)
1297 }
1298
1299 pub async fn query_raw(
1300 &self,
1301 name: &str,
1302 args: serde_json::Value,
1303 ) -> Result<(String, serde_json::Value)> {
1304 self.control.query_plugin_raw(name, args).await
1305 }
1306
1307 pub async fn run_command<Op: lash_core::PluginCommand>(
1308 &self,
1309 args: Op::Args,
1310 ) -> Result<lash_core::PluginCommandReceipt<Op::Output>> {
1311 let receipt = self
1312 .control
1313 .run_plugin_command_raw(Op::NAME, encode_plugin_args::<Op>(args)?)
1314 .await?;
1315 Ok(lash_core::PluginCommandReceipt {
1316 output: decode_plugin_output::<Op>(receipt.output)?,
1317 events: receipt.events,
1318 queued_batches: receipt.queued_batches,
1319 })
1320 }
1321
1322 pub async fn run_command_raw(
1323 &self,
1324 name: &str,
1325 args: serde_json::Value,
1326 ) -> Result<lash_core::PluginCommandReceipt<serde_json::Value>> {
1327 self.control.run_plugin_command_raw(name, args).await
1328 }
1329
1330 pub async fn run_task<Op: lash_core::PluginTask>(
1331 &self,
1332 args: Op::Args,
1333 ) -> Result<lash_core::PluginTaskReceipt<Op::Output>> {
1334 self.run_task_with_cancel::<Op>(args, CancellationToken::new())
1335 .await
1336 }
1337
1338 pub async fn run_task_with_cancel<Op: lash_core::PluginTask>(
1339 &self,
1340 args: Op::Args,
1341 cancellation_token: CancellationToken,
1342 ) -> Result<lash_core::PluginTaskReceipt<Op::Output>> {
1343 let receipt = self
1344 .control
1345 .run_plugin_task_raw_with_cancel(
1346 Op::NAME,
1347 encode_plugin_args::<Op>(args)?,
1348 cancellation_token,
1349 )
1350 .await?;
1351 Ok(lash_core::PluginTaskReceipt {
1352 output: decode_plugin_output::<Op>(receipt.output)?,
1353 events: receipt.events,
1354 queued_batches: receipt.queued_batches,
1355 })
1356 }
1357
1358 pub async fn run_task_raw(
1359 &self,
1360 name: &str,
1361 args: serde_json::Value,
1362 ) -> Result<lash_core::PluginTaskReceipt<serde_json::Value>> {
1363 self.run_task_raw_with_cancel(name, args, CancellationToken::new())
1364 .await
1365 }
1366
1367 pub async fn run_task_raw_with_cancel(
1368 &self,
1369 name: &str,
1370 args: serde_json::Value,
1371 cancellation_token: CancellationToken,
1372 ) -> Result<lash_core::PluginTaskReceipt<serde_json::Value>> {
1373 self.control
1374 .run_plugin_task_raw_with_cancel(name, args, cancellation_token)
1375 .await
1376 }
1377}
1378
1379fn encode_plugin_args<Op: lash_core::PluginOperation>(args: Op::Args) -> Result<serde_json::Value> {
1380 serde_json::to_value(args).map_err(|err| {
1381 EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
1382 "invalid {} args: {err}",
1383 Op::NAME
1384 )))
1385 })
1386}
1387
1388fn decode_plugin_output<Op: lash_core::PluginOperation>(
1389 output: serde_json::Value,
1390) -> Result<Op::Output> {
1391 serde_json::from_value(output).map_err(|err| {
1392 EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
1393 "invalid {} output: {err}",
1394 Op::NAME
1395 )))
1396 })
1397}
1398
1399#[derive(Clone)]
1400pub struct ChildSessionAdmin {
1401 control: SessionAdmin,
1402}
1403
1404impl ChildSessionAdmin {
1405 pub async fn create_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
1406 self.control.create_child_session(request).await
1407 }
1408
1409 pub async fn close_session(&self, session_id: &str) -> Result<()> {
1410 self.control.close_child_session(session_id).await
1411 }
1412
1413 pub async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
1414 self.control.activate_managed_session(session_id).await
1415 }
1416}
1417
1418#[derive(Clone)]
1419pub struct InjectionAdmin {
1420 control: SessionAdmin,
1421}
1422
1423impl InjectionAdmin {
1424 pub async fn inject_turn_input(
1425 &self,
1426 id: Option<String>,
1427 message: PluginMessage,
1428 ) -> Result<()> {
1429 self.control.inject_turn_input(id, message).await
1430 }
1431
1432 pub async fn inject_turn_inputs(
1433 &self,
1434 messages: Vec<lash_core::InjectedTurnInput>,
1435 ) -> Result<()> {
1436 self.control.inject_turn_inputs(messages).await
1437 }
1438}
1439
1440#[derive(Clone)]
1441pub struct ProtocolAdmin {
1442 control: SessionAdmin,
1443}
1444
1445impl ProtocolAdmin {
1446 pub async fn apply_session_extension(
1447 &self,
1448 extension: lash_core::ProtocolSessionExtensionHandle,
1449 ) -> Result<()> {
1450 self.control
1451 .apply_protocol_session_extension(extension)
1452 .await
1453 }
1454}