1pub use crate::session::SessionConfigPatch;
2use crate::support::*;
3pub use lash_core::{AcceptedInjectedTurnInput, PluginCommand, PluginQuery, PluginTask};
4
5#[derive(Clone)]
6pub struct Completions {
7 pub(crate) core: LashCore,
8}
9
10impl Completions {
11 pub async fn resolve(
12 &self,
13 key: lash_core::AwaitEventKey,
14 resolution: lash_core::Resolution,
15 ) -> Result<lash_core::ResolveOutcome> {
16 self.core
17 .env
18 .core
19 .control
20 .effect_host
21 .resolve_await_event(&key, resolution)
22 .await
23 .map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))
24 }
25}
26
27#[derive(Clone)]
28pub struct CoreTriggerAdmin {
29 pub(crate) core: LashCore,
30}
31
32impl CoreTriggerAdmin {
33 pub async fn emit(
34 &self,
35 request: lash_core::TriggerOccurrenceRequest,
36 scoped_effect_controller: ScopedEffectController<'_>,
37 ) -> Result<lash_core::TriggerEmitReport> {
38 let store = self.core.env.trigger_store.as_ref().ok_or_else(|| {
39 EmbedError::Plugin(lash_core::PluginError::Session(
40 "trigger store is unavailable in this runtime".to_string(),
41 ))
42 })?;
43 let drivers = self.core.work_driver.drivers().await;
44 let router = lash_core::TriggerRouter::new(
45 Arc::clone(store),
46 self.core.env.process_registry.clone(),
47 drivers.process,
48 );
49 router
50 .emit(request, scoped_effect_controller.controller())
51 .await
52 .map_err(Into::into)
53 }
54
55 pub async fn subscriptions(
56 &self,
57 filter: lash_core::TriggerSubscriptionFilter,
58 ) -> Result<Vec<lash_core::TriggerRegistration>> {
59 let store = self.core.env.trigger_store.as_ref().ok_or_else(|| {
60 EmbedError::Plugin(lash_core::PluginError::Session(
61 "trigger store is unavailable in this runtime".to_string(),
62 ))
63 })?;
64 let records = store.list_subscriptions(filter).await?;
65 Ok(records
66 .iter()
67 .map(lash_core::TriggerRegistration::from)
68 .collect())
69 }
70}
71
72#[derive(Clone)]
73pub struct Processes {
74 pub(crate) core: LashCore,
75}
76
77impl Processes {
78 fn registry(&self) -> Result<Arc<dyn lash_core::ProcessRegistry>> {
79 self.core
80 .env
81 .process_registry
82 .as_ref()
83 .cloned()
84 .ok_or_else(|| {
85 EmbedError::Plugin(lash_core::PluginError::Session(
86 "process registry is unavailable in this runtime".to_string(),
87 ))
88 })
89 }
90
91 fn make_observer(&self) -> Result<lash_core::ProcessWorkObserver> {
92 Ok(lash_core::ProcessWorkObserver::new(self.registry()?))
93 }
94
95 fn process_invocation(command: &lash_core::ProcessCommand) -> lash_core::RuntimeInvocation {
96 let effect_id = command.effect_id();
97 lash_core::RuntimeInvocation::effect(
98 lash_core::runtime::RuntimeScope::new("runtime"),
99 effect_id.clone(),
100 lash_core::RuntimeEffectKind::Process,
101 effect_id,
102 )
103 }
104
105 async fn run_command(
106 &self,
107 command: lash_core::ProcessCommand,
108 scoped_effect_controller: ScopedEffectController<'_>,
109 ) -> Result<lash_core::ProcessEffectOutcome> {
110 let registry = self.registry()?;
111 let invocation = Self::process_invocation(&command);
112 let outcome = scoped_effect_controller
113 .controller()
114 .execute_effect(
115 lash_core::RuntimeEffectEnvelope::new(
116 invocation,
117 lash_core::RuntimeEffectCommand::process(command),
118 ),
119 lash_core::RuntimeEffectLocalExecutor::processes(registry),
120 )
121 .await
122 .map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))?;
123 match outcome {
124 lash_core::RuntimeEffectOutcome::Process { result } => Ok(result),
125 _ => Err(EmbedError::Plugin(lash_core::PluginError::Session(
126 "process effect returned non-process outcome".to_string(),
127 ))),
128 }
129 }
130
131 pub async fn start(
132 &self,
133 request: lash_core::ProcessStartRequest,
134 scoped_effect_controller: ScopedEffectController<'_>,
135 ) -> Result<lash_core::ProcessRecord> {
136 let env_ref = match request.env_spec.as_ref() {
137 Some(env_spec) => Some(
138 lash_core::runtime::persist_process_execution_env(
139 self.core.env.core.durability.process_env_store.as_ref(),
140 env_spec,
141 )
142 .await?,
143 ),
144 None => None,
145 };
146 let grant = request.grant.clone();
147 let registration = request.into_registration(env_ref);
148 let command = lash_core::ProcessCommand::Start {
149 registration,
150 grant,
151 execution_context: Box::new(lash_core::ProcessExecutionContext::default()),
152 };
153 let outcome = self
154 .run_command(command, scoped_effect_controller.clone())
155 .await?;
156 let lash_core::ProcessEffectOutcome::Start { record } = outcome else {
157 return Err(EmbedError::Plugin(lash_core::PluginError::Session(
158 "process start returned the wrong outcome".to_string(),
159 )));
160 };
161 if let Some(driver) = self.core.work_driver.drivers().await.process {
162 driver.claim_and_run_pending("admin_process_start").await?;
163 }
164 Ok(record)
165 }
166
167 pub async fn list(
168 &self,
169 filter: &lash_core::ProcessListFilter,
170 ) -> Result<Vec<lash_core::ObservedProcess>> {
171 self.make_observer()?.list(filter).await.map_err(Into::into)
172 }
173
174 pub async fn get(&self, process_id: &str) -> Result<Option<lash_core::ObservedProcess>> {
175 Ok(self.make_observer()?.process(process_id).await)
176 }
177
178 pub async fn events(
179 &self,
180 process_id: &str,
181 after_sequence: u64,
182 ) -> Result<Vec<lash_core::ObservedProcessEvent>> {
183 self.make_observer()?
184 .events_after(process_id, after_sequence)
185 .await
186 .map_err(Into::into)
187 }
188
189 pub async fn await_output(&self, process_id: &str) -> Result<lash_core::ProcessAwaitOutput> {
190 self.registry()?
191 .await_process(process_id)
192 .await
193 .map_err(Into::into)
194 }
195
196 pub async fn cancel(
197 &self,
198 process_id: &str,
199 scoped_effect_controller: ScopedEffectController<'_>,
200 ) -> Result<lash_core::ProcessCancelSummary> {
201 let command = lash_core::ProcessCommand::Cancel {
202 process_id: process_id.to_string(),
203 reason: Some("requested by host".to_string()),
204 };
205 let outcome = self
206 .run_command(command, scoped_effect_controller.clone())
207 .await?;
208 let lash_core::ProcessEffectOutcome::Cancel { record } = outcome else {
209 return Err(EmbedError::Plugin(lash_core::PluginError::Session(
210 "process cancel returned the wrong outcome".to_string(),
211 )));
212 };
213 Ok(lash_core::ProcessCancelSummary::from_record(record))
214 }
215
216 pub async fn signal(
217 &self,
218 process_id: &str,
219 signal_name: impl Into<String>,
220 signal_id: impl Into<String>,
221 request: lash_core::ProcessEventAppendRequest,
222 scoped_effect_controller: ScopedEffectController<'_>,
223 ) -> Result<lash_core::ProcessEvent> {
224 let signal_name = signal_name.into();
225 let event_type = request.event_type.clone();
226 let payload = request.payload.clone();
227 let command = lash_core::ProcessCommand::Signal {
228 process_id: process_id.to_string(),
229 signal_name: signal_name.clone(),
230 signal_id: signal_id.into(),
231 request,
232 };
233 let outcome = self
234 .run_command(command, scoped_effect_controller.clone())
235 .await?;
236 let lash_core::ProcessEffectOutcome::Signal { event } = outcome else {
237 return Err(EmbedError::Plugin(lash_core::PluginError::Session(
238 "process signal returned the wrong outcome".to_string(),
239 )));
240 };
241 let registry = self.registry()?;
242 let waiting_ordinal =
243 registry
244 .get_process(process_id)
245 .await
246 .and_then(|record| match record.wait {
247 Some(lash_core::WaitState {
248 kind:
249 lash_core::WaitKind::Signal {
250 name,
251 event_type: wait_event_type,
252 ordinal,
253 ..
254 },
255 ..
256 }) if name == signal_name && wait_event_type == event_type => Some(ordinal),
257 _ => None,
258 });
259 let ordinal = match waiting_ordinal {
260 Some(ordinal) => ordinal,
261 None => {
262 registry
263 .count_events_through(process_id, &event_type, event.sequence)
264 .await?
265 }
266 };
267 if ordinal > 0 {
268 let key = scoped_effect_controller
269 .controller()
270 .await_event_key(
271 &lash_core::ExecutionScope::process(process_id),
272 lash_core::AwaitEventWaitIdentity::process_signal(
273 process_id,
274 &signal_name,
275 ordinal,
276 ),
277 )
278 .await
279 .map_err(|err| {
280 EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
281 })?;
282 let _ = scoped_effect_controller
283 .controller()
284 .resolve_await_event(&key, lash_core::Resolution::Ok(payload))
285 .await
286 .map_err(|err| {
287 EmbedError::Plugin(lash_core::PluginError::Session(err.to_string()))
288 })?;
289 }
290 Ok(event)
291 }
292
293 pub async fn session_snapshot(
294 &self,
295 session_id: impl Into<String>,
296 ) -> Result<lash_core::ProcessWorkSnapshot> {
297 self.make_observer()?
298 .snapshot_for_session(session_id)
299 .await
300 .map_err(Into::into)
301 }
302
303 pub fn observer(&self) -> Result<lash_core::ProcessWorkObserver> {
304 self.make_observer()
305 }
306}
307
308#[derive(Clone)]
309pub struct SessionAdmin {
310 pub(crate) runtime: RuntimeHandle,
311}
312
313impl SessionAdmin {
314 pub fn config(&self) -> SessionConfigAdmin {
315 SessionConfigAdmin {
316 control: self.clone(),
317 }
318 }
319
320 pub fn tools(&self) -> ToolAdmin {
321 ToolAdmin {
322 control: self.clone(),
323 }
324 }
325
326 pub fn commands(&self) -> SessionCommandAdmin {
327 SessionCommandAdmin {
328 control: self.clone(),
329 }
330 }
331
332 pub fn triggers(&self) -> SessionTriggerAdmin {
333 SessionTriggerAdmin {
334 control: self.clone(),
335 }
336 }
337
338 pub fn state(&self) -> SessionStateAdmin {
339 SessionStateAdmin {
340 control: self.clone(),
341 }
342 }
343
344 pub fn children(&self) -> ChildSessionAdmin {
345 ChildSessionAdmin {
346 control: self.clone(),
347 }
348 }
349
350 pub fn injection(&self) -> InjectionAdmin {
351 InjectionAdmin {
352 control: self.clone(),
353 }
354 }
355
356 pub fn protocol(&self) -> ProtocolAdmin {
357 ProtocolAdmin {
358 control: self.clone(),
359 }
360 }
361
362 pub fn processes(&self) -> SessionProcessAdmin {
363 SessionProcessAdmin {
364 control: self.clone(),
365 }
366 }
367
368 async fn with_writer<F, T>(&self, f: F) -> T
373 where
374 F: AsyncFnOnce(&mut LashRuntime) -> T,
375 {
376 let writer = self.runtime.writer();
377 let mut runtime = writer.lock().await;
378 let value = f(&mut runtime).await;
379 self.runtime.publish_from(&runtime);
380 value
381 }
382
383 async fn update_config(&self, patch: SessionConfigPatch) -> Result<()> {
384 self.update_session_config(patch.provider, patch.model, patch.prompt)
385 .await?;
386 Ok(())
387 }
388
389 async fn update_session_config(
390 &self,
391 provider: Option<ProviderHandle>,
392 model: Option<lash_core::ModelSpec>,
393 prompt: Option<PromptLayer>,
394 ) -> Result<()> {
395 self.with_writer(async |runtime: &mut LashRuntime| {
396 runtime.update_session_config(provider, model, prompt).await;
397 })
398 .await;
399 Ok(())
400 }
401
402 async fn export_state(&self) -> lash_core::SessionSnapshot {
403 self.runtime.observe().read_view.to_snapshot()
404 }
405
406 async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
407 self.with_writer(async |runtime: &mut LashRuntime| {
408 runtime
409 .append_session_nodes(lash_core::AppendSessionNodesRequest {
410 nodes: messages
411 .into_iter()
412 .map(lash_core::SessionAppendNode::message)
413 .collect(),
414 requires_ancestor_node_id: None,
415 })
416 .await
417 .map(|_| ())
418 .map_err(Into::into)
419 })
420 .await
421 }
422
423 async fn append_plugin_body(
424 &self,
425 plugin_type: impl Into<String>,
426 body: serde_json::Value,
427 ) -> Result<()> {
428 self.with_writer(async |runtime: &mut LashRuntime| {
429 runtime
430 .append_session_nodes(lash_core::AppendSessionNodesRequest {
431 nodes: vec![lash_core::SessionAppendNode::plugin(plugin_type, body)],
432 requires_ancestor_node_id: None,
433 })
434 .await
435 .map(|_| ())
436 .map_err(Into::into)
437 })
438 .await
439 }
440
441 async fn set_persisted_state(&self, state: RuntimeSessionState) -> Result<()> {
442 self.with_writer(async |runtime: &mut LashRuntime| {
443 runtime.set_persisted_state(state).map_err(Into::into)
444 })
445 .await
446 }
447
448 async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
449 self.with_writer(async |runtime: &mut LashRuntime| {
450 runtime.set_prompt_template(template).await;
451 })
452 .await;
453 Ok(())
454 }
455
456 async fn clear_prompt_template(&self) -> Result<()> {
457 self.with_writer(async |runtime: &mut LashRuntime| {
458 runtime.clear_prompt_template().await;
459 })
460 .await;
461 Ok(())
462 }
463
464 async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
465 self.with_writer(async |runtime: &mut LashRuntime| {
466 runtime.add_prompt_contribution(contribution).await;
467 })
468 .await;
469 Ok(())
470 }
471
472 async fn replace_prompt_slot(
473 &self,
474 slot: PromptSlot,
475 contributions: impl IntoIterator<Item = PromptContribution>,
476 ) -> Result<()> {
477 self.with_writer(async |runtime: &mut LashRuntime| {
478 runtime.replace_prompt_slot(slot, contributions).await;
479 })
480 .await;
481 Ok(())
482 }
483
484 async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
485 self.with_writer(async |runtime: &mut LashRuntime| {
486 runtime.clear_prompt_slot(slot).await;
487 })
488 .await;
489 Ok(())
490 }
491
492 async fn apply_protocol_session_extension(
493 &self,
494 extension: lash_core::ProtocolSessionExtensionHandle,
495 ) -> Result<()> {
496 self.with_writer(async |runtime: &mut LashRuntime| {
497 runtime
498 .apply_protocol_session_extension(extension)
499 .await
500 .map_err(Into::into)
501 })
502 .await
503 }
504
505 async fn branch_to_node(
506 &self,
507 target_leaf: Option<String>,
508 ) -> Result<lash_core::SessionSnapshot> {
509 self.with_writer(async |runtime: &mut LashRuntime| {
510 runtime
511 .branch_to_node(target_leaf)
512 .await
513 .map_err(Into::into)
514 })
515 .await
516 }
517
518 async fn await_background_work(&self) -> Result<()> {
519 self.with_writer(async |runtime: &mut LashRuntime| {
520 runtime.await_background_work().await.map_err(Into::into)
521 })
522 .await
523 }
524
525 async fn refresh_tool_catalog(&self) -> Result<()> {
526 self.with_writer(async |runtime: &mut LashRuntime| {
527 runtime
528 .refresh_session_tool_catalog()
529 .await
530 .map_err(Into::into)
531 })
532 .await
533 }
534
535 async fn submit_session_command(
536 &self,
537 command: lash_core::SessionCommand,
538 idempotency_key: impl Into<String>,
539 ) -> Result<lash_core::SessionCommandReceipt> {
540 let idempotency_key = idempotency_key.into();
541 self.with_writer(async |runtime: &mut LashRuntime| {
542 runtime
543 .submit_session_command(command, idempotency_key)
544 .await
545 .map_err(Into::into)
546 })
547 .await
548 }
549
550 async fn list_trigger_registrations(&self) -> Result<Vec<lash_core::TriggerRegistration>> {
551 self.with_writer(async |runtime: &mut LashRuntime| {
552 runtime
553 .list_trigger_registrations()
554 .await
555 .map_err(Into::into)
556 })
557 .await
558 }
559
560 async fn trigger_registrations_by_source_type(
561 &self,
562 source_type: impl Into<lash_core::TriggerEventType>,
563 ) -> Result<Vec<lash_core::TriggerRegistration>> {
564 self.with_writer(async |runtime: &mut LashRuntime| {
565 runtime
566 .trigger_registrations_by_source_type(source_type)
567 .await
568 .map_err(Into::into)
569 })
570 .await
571 }
572
573 async fn query_plugin_raw(
574 &self,
575 name: &str,
576 args: serde_json::Value,
577 ) -> Result<(String, serde_json::Value)> {
578 let observation = self.runtime.observe();
579 let session_id = observation.session_id().to_string();
580 observation
581 .query_plugin(name, args, Some(session_id))
582 .await
583 .map_err(Into::into)
584 }
585
586 async fn run_plugin_command_raw(
587 &self,
588 name: &str,
589 args: serde_json::Value,
590 ) -> Result<lash_core::PluginCommandReceipt<serde_json::Value>> {
591 let session_id = self.runtime.observe().session_id().to_string();
592 let writer = self.runtime.writer();
593 let mut runtime = writer.lock().await;
594 let receipt = runtime
595 .run_plugin_command(name, args, Some(session_id))
596 .await?;
597 self.record_plugin_operation_observations(&receipt.events, &receipt.queued_batches);
598 self.runtime.publish_from(&runtime);
599 Ok(receipt)
600 }
601
602 async fn run_plugin_task_raw_with_cancel(
603 &self,
604 name: &str,
605 args: serde_json::Value,
606 cancellation_token: CancellationToken,
607 ) -> Result<lash_core::PluginTaskReceipt<serde_json::Value>> {
608 let session_id = self.runtime.observe().session_id().to_string();
609 let writer = self.runtime.writer();
610 let mut runtime = writer.lock().await;
611 let scope_id = format!(
612 "{session_id}:plugin_task:{name}:{}",
613 lash_core::TurnActivityId::fresh().0
614 );
615 let scoped_effect_controller = runtime
616 .effect_host()
617 .scoped_static(lash_core::ExecutionScope::runtime_operation(scope_id))
618 .map_err(EmbedError::Runtime)?
619 .ok_or_else(|| {
620 EmbedError::Plugin(lash_core::PluginError::Session(
621 "plugin task execution requires an effect host that can create a static runtime-operation scope".to_string(),
622 ))
623 })?;
624 let receipt = runtime
625 .run_plugin_task(
626 name,
627 args,
628 Some(session_id),
629 scoped_effect_controller,
630 cancellation_token,
631 )
632 .await?;
633 self.record_plugin_operation_observations(&receipt.events, &receipt.queued_batches);
634 self.runtime.publish_from(&runtime);
635 Ok(receipt)
636 }
637
638 fn record_plugin_operation_observations(
639 &self,
640 events: &[lash_core::PluginOwned<lash_core::PluginRuntimeEvent>],
641 queued_batches: &[lash_core::runtime::QueuedWorkBatch],
642 ) {
643 for owned in events {
644 self.runtime
645 .record_turn_activity(lash_core::TurnActivity::independent(
646 lash_core::TurnEvent::PluginRuntime {
647 plugin_id: owned.plugin_id.clone(),
648 event: owned.value.clone(),
649 },
650 ));
651 }
652 if !queued_batches.is_empty() {
653 self.runtime.record_queue_changed(
654 lash_core::SessionQueueEventKind::Enqueued,
655 queued_batches
656 .iter()
657 .map(|batch| batch.batch_id.clone())
658 .collect(),
659 );
660 }
661 }
662
663 async fn compact_context(
664 &self,
665 instructions: Option<String>,
666 scoped_effect_controller: ScopedEffectController<'_>,
667 ) -> Result<bool> {
668 self.with_writer(async |runtime: &mut LashRuntime| {
669 runtime
670 .compact_context(instructions, scoped_effect_controller)
671 .await
672 .map_err(Into::into)
673 })
674 .await
675 }
676
677 async fn persist_current_state(&self) -> Result<RuntimeSessionState> {
678 self.with_writer(async |runtime: &mut LashRuntime| {
679 runtime.await_background_work().await?;
680 Ok(runtime.export_persisted_state())
681 })
682 .await
683 }
684
685 async fn list_process_handles(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
686 Ok(self.runtime.observe().list_process_handles().await)
687 }
688
689 async fn list_all_process_handles(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
690 Ok(self.runtime.observe().list_all_process_handles().await)
691 }
692
693 async fn start_process(
694 &self,
695 request: lash_core::ProcessStartRequest,
696 scoped_effect_controller: ScopedEffectController<'_>,
697 ) -> Result<lash_core::ProcessHandleSummary> {
698 let writer = self.runtime.writer();
699 let runtime = writer.lock().await;
700 let session_id = runtime.session_id().to_string();
701 let processes = runtime.process_service()?;
702 let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
703 let summary = processes
704 .start_from_request(&session_id, request, scope)
705 .await
706 .map_err(EmbedError::Plugin)?;
707 self.runtime.record_process_changed(
708 SessionProcessEventKind::Started,
709 vec![summary.process_id.clone()],
710 );
711 Ok(summary)
712 }
713
714 async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
715 self.runtime
716 .writer()
717 .lock()
718 .await
719 .session_state_service()
720 .map_err(Into::into)
721 }
722
723 async fn cancel_process(
724 &self,
725 process_id: &str,
726 scoped_effect_controller: ScopedEffectController<'_>,
727 ) -> Result<lash_core::ProcessCancelSummary> {
728 let writer = self.runtime.writer();
729 let runtime = writer.lock().await;
730 let session_id = runtime.session_id().to_string();
731 let processes = runtime.process_service()?;
732 let cancel_ability = runtime.process_cancel_ability();
733 let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
734 let summary = cancel_ability
735 .cancel_summary(
736 processes.as_ref(),
737 lash_core::ProcessCancelRequest::new(
738 &session_id,
739 process_id,
740 scope,
741 lash_core::ProcessCancelSource::HostApi,
742 )
743 .with_reason("requested by host API"),
744 )
745 .await
746 .map_err(EmbedError::Plugin)?;
747 self.runtime.record_process_changed(
748 SessionProcessEventKind::Cancelled,
749 vec![summary.process_id.clone()],
750 );
751 Ok(summary)
752 }
753
754 async fn cancel_visible_processes(
755 &self,
756 scoped_effect_controller: ScopedEffectController<'_>,
757 ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
758 let writer = self.runtime.writer();
759 let runtime = writer.lock().await;
760 let session_id = runtime.session_id().to_string();
761 let processes = runtime.process_service()?;
762 let cancel_ability = runtime.process_cancel_ability();
763 let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
764 let summaries = cancel_ability
765 .cancel_all_visible(
766 processes.as_ref(),
767 lash_core::ProcessCancelAllRequest::new(
768 &session_id,
769 scope,
770 lash_core::ProcessCancelSource::HostApi,
771 )
772 .with_reason("requested by host API"),
773 )
774 .await
775 .map_err(EmbedError::Plugin)?;
776 self.runtime.record_process_changed(
777 SessionProcessEventKind::Cancelled,
778 summaries
779 .iter()
780 .map(|summary| summary.process_id.clone())
781 .collect(),
782 );
783 Ok(summaries)
784 }
785
786 async fn snapshot_execution_state(&self) -> Result<Option<Vec<u8>>> {
787 self.with_writer(async |runtime: &mut LashRuntime| {
788 runtime.snapshot_execution_state().await.map_err(Into::into)
789 })
790 .await
791 }
792
793 async fn restore_execution_state(&self, bytes: &[u8]) -> Result<()> {
794 self.with_writer(async |runtime: &mut LashRuntime| {
795 runtime
796 .restore_execution_state(bytes)
797 .await
798 .map_err(Into::into)
799 })
800 .await
801 }
802
803 async fn tool_state(&self) -> Result<ToolState> {
804 self.runtime.observe().tool_state.clone().ok_or_else(|| {
805 EmbedError::Session(SessionError::Protocol(
806 "runtime session not available".to_string(),
807 ))
808 })
809 }
810
811 async fn apply_tool_state(&self, state: ToolState) -> Result<u64> {
812 self.with_writer(async |runtime: &mut LashRuntime| {
813 runtime
814 .apply_tool_state(state)
815 .await
816 .map_err(EmbedError::from)
817 })
818 .await
819 }
820
821 async fn restore_tool_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
822 self.with_writer(async |runtime: &mut LashRuntime| {
823 runtime
824 .restore_tool_state(state)
825 .await
826 .map_err(EmbedError::from)
827 })
828 .await
829 }
830
831 async fn set_tool_membership(&self, tool_id: lash_core::ToolId, present: bool) -> Result<u64> {
832 self.set_tool_membership_many(&[(tool_id, present)]).await
833 }
834
835 async fn set_tool_membership_many(&self, updates: &[(lash_core::ToolId, bool)]) -> Result<u64> {
836 let mut state = self.tool_state().await?;
837 for (tool_id, present) in updates {
838 state
839 .set_membership(tool_id, *present)
840 .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
841 }
842 self.apply_tool_state(state).await
843 }
844
845 async fn active_tool_manifests(&self) -> Result<Vec<ToolManifest>> {
846 Ok(self.tool_state().await?.tool_manifests())
847 }
848
849 async fn add_tool_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
850 let tool_registry = self.tool_registry().await?;
851 let handle = tool_registry
852 .add_tool_provider(provider)
853 .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
854 self.refresh_tool_catalog().await?;
855 Ok(handle)
856 }
857
858 async fn remove_tool_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
859 let tool_registry = self.tool_registry().await?;
860 let generation = tool_registry
861 .remove_source(handle)
862 .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
863 self.refresh_tool_catalog().await?;
864 Ok(generation)
865 }
866
867 async fn create_child_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
868 let writer = self.runtime.writer();
869 let runtime = writer.lock().await;
870 let lifecycle = runtime.session_lifecycle_service()?;
871 lifecycle.create_session(request).await.map_err(Into::into)
872 }
873
874 async fn close_child_session(&self, session_id: &str) -> Result<()> {
875 let writer = self.runtime.writer();
876 let runtime = writer.lock().await;
877 let lifecycle = runtime.session_lifecycle_service()?;
878 lifecycle
879 .close_session(session_id)
880 .await
881 .map_err(Into::into)
882 }
883
884 async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
885 self.with_writer(async |runtime: &mut LashRuntime| {
886 runtime
887 .activate_managed_session(session_id)
888 .await
889 .map_err(Into::into)
890 })
891 .await
892 }
893
894 async fn inject_turn_input(&self, id: Option<String>, message: PluginMessage) -> Result<()> {
895 self.inject_turn_inputs(vec![lash_core::InjectedTurnInput { id, message }])
896 .await
897 }
898
899 async fn inject_turn_inputs(&self, messages: Vec<lash_core::InjectedTurnInput>) -> Result<()> {
900 for input in messages {
901 let source_key = input.id.map(|id| format!("injection:{id}"));
902 let turn_input = turn_input_from_plugin_message(input.message);
903 self.runtime
904 .enqueue_turn_input(
905 turn_input,
906 lash_core::DeliveryPolicy::EarliestSafeBoundary,
907 lash_core::SlotPolicy::Join,
908 source_key,
909 )
910 .await
911 .map(|_| ())
912 .map_err(EmbedError::Runtime)?;
913 }
914 Ok(())
915 }
916
917 async fn tool_registry(&self) -> Result<Arc<lash_core::ToolRegistry>> {
918 self.runtime
919 .writer()
920 .lock()
921 .await
922 .plugin_session()
923 .map(|session| session.tool_registry())
924 .ok_or_else(|| {
925 EmbedError::Session(SessionError::Protocol(
926 "tool registry is unavailable in this runtime session".to_string(),
927 ))
928 })
929 }
930}
931
932fn turn_input_from_plugin_message(message: PluginMessage) -> TurnInput {
933 let mut input = TurnInput::empty();
934 if !message.content.is_empty() {
935 input.items.push(InputItem::Text {
936 text: message.content,
937 });
938 }
939 for (index, bytes) in message.images.into_iter().enumerate() {
940 let id = format!("injected-image-{index}");
941 input.items.push(InputItem::ImageRef { id: id.clone() });
942 input.image_blobs.insert(id, bytes);
943 }
944 input
945}
946
947#[derive(Clone)]
948pub struct SessionConfigAdmin {
949 control: SessionAdmin,
950}
951
952impl SessionConfigAdmin {
953 pub async fn update(&self, patch: SessionConfigPatch) -> Result<()> {
954 self.control.update_config(patch).await
955 }
956
957 pub async fn update_session_config(
958 &self,
959 provider: Option<ProviderHandle>,
960 model: Option<lash_core::ModelSpec>,
961 prompt: Option<PromptLayer>,
962 ) -> Result<()> {
963 self.control
964 .update_session_config(provider, model, prompt)
965 .await
966 }
967
968 pub async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
969 self.control.set_prompt_template(template).await
970 }
971
972 pub async fn clear_prompt_template(&self) -> Result<()> {
973 self.control.clear_prompt_template().await
974 }
975
976 pub async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
977 self.control.add_prompt_contribution(contribution).await
978 }
979
980 pub async fn replace_prompt_slot(
981 &self,
982 slot: PromptSlot,
983 contributions: impl IntoIterator<Item = PromptContribution>,
984 ) -> Result<()> {
985 self.control.replace_prompt_slot(slot, contributions).await
986 }
987
988 pub async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
989 self.control.clear_prompt_slot(slot).await
990 }
991}
992
993#[derive(Clone)]
994pub struct ToolAdmin {
995 control: SessionAdmin,
996}
997
998impl ToolAdmin {
999 pub(crate) fn new(control: SessionAdmin) -> Self {
1000 Self { control }
1001 }
1002}
1003
1004impl ToolAdmin {
1005 pub async fn state(&self) -> Result<ToolState> {
1006 self.control.tool_state().await
1007 }
1008
1009 pub fn advanced(&self) -> AdvancedToolAdmin {
1010 AdvancedToolAdmin {
1011 control: self.control.clone(),
1012 }
1013 }
1014
1015 pub async fn set_membership(
1018 &self,
1019 tool_id: impl Into<lash_core::ToolId>,
1020 present: bool,
1021 ) -> Result<u64> {
1022 self.control
1023 .set_tool_membership(tool_id.into(), present)
1024 .await
1025 }
1026
1027 pub async fn set_membership_many(&self, updates: &[(lash_core::ToolId, bool)]) -> Result<u64> {
1028 self.control.set_tool_membership_many(updates).await
1029 }
1030
1031 pub async fn active_manifests(&self) -> Result<Vec<ToolManifest>> {
1032 self.control.active_tool_manifests().await
1033 }
1034
1035 pub async fn add_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
1036 self.control.add_tool_provider(provider).await
1037 }
1038
1039 pub async fn remove_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
1040 self.control.remove_tool_source(handle).await
1041 }
1042}
1043
1044#[derive(Clone)]
1045pub struct AdvancedToolAdmin {
1046 control: SessionAdmin,
1047}
1048
1049impl AdvancedToolAdmin {
1050 pub async fn apply_state(&self, state: ToolState) -> Result<u64> {
1056 self.control.apply_tool_state(state).await
1057 }
1058
1059 pub async fn restore_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
1072 self.control.restore_tool_state(state).await
1073 }
1074}
1075
1076#[derive(Clone)]
1077pub struct SessionCommandAdmin {
1078 control: SessionAdmin,
1079}
1080
1081impl SessionCommandAdmin {
1082 pub async fn refresh_tool_catalog(
1087 &self,
1088 reason: impl Into<String>,
1089 idempotency_key: impl Into<String>,
1090 ) -> Result<lash_core::SessionCommandReceipt> {
1091 self.control
1092 .submit_session_command(
1093 lash_core::SessionCommand::RefreshToolCatalog {
1094 reason: reason.into(),
1095 },
1096 idempotency_key,
1097 )
1098 .await
1099 }
1100
1101 pub async fn reset(
1102 &self,
1103 reason: impl Into<String>,
1104 idempotency_key: impl Into<String>,
1105 ) -> Result<lash_core::SessionCommandReceipt> {
1106 self.control
1107 .submit_session_command(
1108 lash_core::SessionCommand::ResetSession {
1109 reason: reason.into(),
1110 },
1111 idempotency_key,
1112 )
1113 .await
1114 }
1115}
1116
1117#[derive(Clone)]
1119pub struct SessionTriggerAdmin {
1120 control: SessionAdmin,
1121}
1122
1123impl SessionTriggerAdmin {
1124 pub async fn list_all(&self) -> Result<Vec<lash_core::TriggerRegistration>> {
1130 self.control.list_trigger_registrations().await
1131 }
1132
1133 pub async fn by_source_type(
1138 &self,
1139 source_type: impl Into<lash_core::TriggerEventType>,
1140 ) -> Result<Vec<lash_core::TriggerRegistration>> {
1141 self.control
1142 .trigger_registrations_by_source_type(source_type)
1143 .await
1144 }
1145}
1146
1147#[derive(Clone)]
1148pub struct SessionProcessAdmin {
1149 control: SessionAdmin,
1150}
1151
1152impl SessionProcessAdmin {
1153 pub(crate) fn new(control: SessionAdmin) -> Self {
1154 Self { control }
1155 }
1156
1157 pub async fn start(
1158 &self,
1159 request: lash_core::ProcessStartRequest,
1160 scoped_effect_controller: ScopedEffectController<'_>,
1161 ) -> Result<lash_core::ProcessHandleSummary> {
1162 self.control
1163 .start_process(request, scoped_effect_controller)
1164 .await
1165 }
1166
1167 pub async fn list(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
1168 self.control.list_process_handles().await
1169 }
1170
1171 pub async fn list_all(&self) -> Result<Vec<lash_core::ProcessHandleSummary>> {
1172 self.control.list_all_process_handles().await
1173 }
1174
1175 pub async fn await_all(&self) -> Result<()> {
1176 self.control.await_background_work().await
1177 }
1178
1179 pub async fn cancel(
1180 &self,
1181 process_id: &str,
1182 scoped_effect_controller: ScopedEffectController<'_>,
1183 ) -> Result<lash_core::ProcessCancelSummary> {
1184 self.control
1185 .cancel_process(process_id, scoped_effect_controller)
1186 .await
1187 }
1188
1189 pub async fn cancel_all(
1190 &self,
1191 scoped_effect_controller: ScopedEffectController<'_>,
1192 ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
1193 self.control
1194 .cancel_visible_processes(scoped_effect_controller)
1195 .await
1196 }
1197}
1198
1199#[derive(Clone)]
1200pub struct SessionStateAdmin {
1201 control: SessionAdmin,
1202}
1203
1204impl SessionStateAdmin {
1205 pub async fn export(&self) -> lash_core::SessionSnapshot {
1206 self.control.export_state().await
1207 }
1208
1209 pub async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
1210 self.control.append_messages(messages).await
1211 }
1212
1213 pub async fn append_plugin_body(
1214 &self,
1215 plugin_type: impl Into<String>,
1216 body: serde_json::Value,
1217 ) -> Result<()> {
1218 self.control.append_plugin_body(plugin_type, body).await
1219 }
1220
1221 pub async fn set_persisted(&self, state: RuntimeSessionState) -> Result<()> {
1222 self.control.set_persisted_state(state).await
1223 }
1224
1225 pub async fn branch_to_node(
1226 &self,
1227 target_leaf: Option<String>,
1228 ) -> Result<lash_core::SessionSnapshot> {
1229 self.control.branch_to_node(target_leaf).await
1230 }
1231
1232 pub async fn persist_current(&self) -> Result<RuntimeSessionState> {
1233 self.control.persist_current_state().await
1234 }
1235
1236 pub async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
1237 self.control.session_state_service().await
1238 }
1239
1240 pub async fn snapshot_execution(&self) -> Result<Option<Vec<u8>>> {
1241 self.control.snapshot_execution_state().await
1242 }
1243
1244 pub async fn restore_execution(&self, bytes: &[u8]) -> Result<()> {
1245 self.control.restore_execution_state(bytes).await
1246 }
1247
1248 pub async fn compact_context(
1249 &self,
1250 instructions: Option<String>,
1251 scoped_effect_controller: ScopedEffectController<'_>,
1252 ) -> Result<bool> {
1253 self.control
1254 .compact_context(instructions, scoped_effect_controller)
1255 .await
1256 }
1257}
1258
1259#[derive(Clone)]
1260pub struct PluginOperations {
1261 pub(crate) control: SessionAdmin,
1262}
1263
1264impl PluginOperations {
1265 pub async fn query<Op: lash_core::PluginQuery>(&self, args: Op::Args) -> Result<Op::Output> {
1266 let (_plugin_id, output) = self
1267 .control
1268 .query_plugin_raw(Op::NAME, encode_plugin_args::<Op>(args)?)
1269 .await?;
1270 decode_plugin_output::<Op>(output)
1271 }
1272
1273 pub async fn query_raw(
1274 &self,
1275 name: &str,
1276 args: serde_json::Value,
1277 ) -> Result<(String, serde_json::Value)> {
1278 self.control.query_plugin_raw(name, args).await
1279 }
1280
1281 pub async fn run_command<Op: lash_core::PluginCommand>(
1282 &self,
1283 args: Op::Args,
1284 ) -> Result<lash_core::PluginCommandReceipt<Op::Output>> {
1285 let receipt = self
1286 .control
1287 .run_plugin_command_raw(Op::NAME, encode_plugin_args::<Op>(args)?)
1288 .await?;
1289 Ok(lash_core::PluginCommandReceipt {
1290 output: decode_plugin_output::<Op>(receipt.output)?,
1291 events: receipt.events,
1292 queued_batches: receipt.queued_batches,
1293 })
1294 }
1295
1296 pub async fn run_command_raw(
1297 &self,
1298 name: &str,
1299 args: serde_json::Value,
1300 ) -> Result<lash_core::PluginCommandReceipt<serde_json::Value>> {
1301 self.control.run_plugin_command_raw(name, args).await
1302 }
1303
1304 pub async fn run_task<Op: lash_core::PluginTask>(
1305 &self,
1306 args: Op::Args,
1307 ) -> Result<lash_core::PluginTaskReceipt<Op::Output>> {
1308 self.run_task_with_cancel::<Op>(args, CancellationToken::new())
1309 .await
1310 }
1311
1312 pub async fn run_task_with_cancel<Op: lash_core::PluginTask>(
1313 &self,
1314 args: Op::Args,
1315 cancellation_token: CancellationToken,
1316 ) -> Result<lash_core::PluginTaskReceipt<Op::Output>> {
1317 let receipt = self
1318 .control
1319 .run_plugin_task_raw_with_cancel(
1320 Op::NAME,
1321 encode_plugin_args::<Op>(args)?,
1322 cancellation_token,
1323 )
1324 .await?;
1325 Ok(lash_core::PluginTaskReceipt {
1326 output: decode_plugin_output::<Op>(receipt.output)?,
1327 events: receipt.events,
1328 queued_batches: receipt.queued_batches,
1329 })
1330 }
1331
1332 pub async fn run_task_raw(
1333 &self,
1334 name: &str,
1335 args: serde_json::Value,
1336 ) -> Result<lash_core::PluginTaskReceipt<serde_json::Value>> {
1337 self.run_task_raw_with_cancel(name, args, CancellationToken::new())
1338 .await
1339 }
1340
1341 pub async fn run_task_raw_with_cancel(
1342 &self,
1343 name: &str,
1344 args: serde_json::Value,
1345 cancellation_token: CancellationToken,
1346 ) -> Result<lash_core::PluginTaskReceipt<serde_json::Value>> {
1347 self.control
1348 .run_plugin_task_raw_with_cancel(name, args, cancellation_token)
1349 .await
1350 }
1351}
1352
1353fn encode_plugin_args<Op: lash_core::PluginOperation>(args: Op::Args) -> Result<serde_json::Value> {
1354 serde_json::to_value(args).map_err(|err| {
1355 EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
1356 "invalid {} args: {err}",
1357 Op::NAME
1358 )))
1359 })
1360}
1361
1362fn decode_plugin_output<Op: lash_core::PluginOperation>(
1363 output: serde_json::Value,
1364) -> Result<Op::Output> {
1365 serde_json::from_value(output).map_err(|err| {
1366 EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
1367 "invalid {} output: {err}",
1368 Op::NAME
1369 )))
1370 })
1371}
1372
1373#[derive(Clone)]
1374pub struct ChildSessionAdmin {
1375 control: SessionAdmin,
1376}
1377
1378impl ChildSessionAdmin {
1379 pub async fn create_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
1380 self.control.create_child_session(request).await
1381 }
1382
1383 pub async fn close_session(&self, session_id: &str) -> Result<()> {
1384 self.control.close_child_session(session_id).await
1385 }
1386
1387 pub async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
1388 self.control.activate_managed_session(session_id).await
1389 }
1390}
1391
1392#[derive(Clone)]
1393pub struct InjectionAdmin {
1394 control: SessionAdmin,
1395}
1396
1397impl InjectionAdmin {
1398 pub async fn inject_turn_input(
1399 &self,
1400 id: Option<String>,
1401 message: PluginMessage,
1402 ) -> Result<()> {
1403 self.control.inject_turn_input(id, message).await
1404 }
1405
1406 pub async fn inject_turn_inputs(
1407 &self,
1408 messages: Vec<lash_core::InjectedTurnInput>,
1409 ) -> Result<()> {
1410 self.control.inject_turn_inputs(messages).await
1411 }
1412}
1413
1414#[derive(Clone)]
1415pub struct ProtocolAdmin {
1416 control: SessionAdmin,
1417}
1418
1419impl ProtocolAdmin {
1420 pub async fn apply_session_extension(
1421 &self,
1422 extension: lash_core::ProtocolSessionExtensionHandle,
1423 ) -> Result<()> {
1424 self.control
1425 .apply_protocol_session_extension(extension)
1426 .await
1427 }
1428}