1pub(crate) use crate::session::SessionConfigPatch;
2use crate::support::*;
3pub use lash_core::{AcceptedInjectedTurnInput, PluginCommand, PluginQuery, PluginTask};
4
5#[derive(Clone)]
6pub struct Completions {
7 pub(crate) core: LashCore,
8}
9
10impl Completions {
11 pub async fn resolve(
12 &self,
13 key: lash_core::AwaitEventKey,
14 resolution: lash_core::Resolution,
15 ) -> Result<lash_core::ResolveOutcome> {
16 self.core
17 .env
18 .core
19 .control
20 .effect_host
21 .resolve_await_event(&key, resolution)
22 .await
23 .map_err(|err| EmbedError::Plugin(lash_core::PluginError::Session(err.to_string())))
24 }
25}
26
27#[derive(Clone)]
28pub struct CoreTriggerAdmin {
29 pub(crate) core: LashCore,
30}
31
32impl CoreTriggerAdmin {
33 pub async fn emit(
34 &self,
35 request: lash_core::TriggerOccurrenceRequest,
36 scoped_effect_controller: ScopedEffectController<'_>,
37 ) -> Result<lash_core::TriggerEmitReport> {
38 let store = self.core.env.trigger_store.as_ref().ok_or_else(|| {
39 EmbedError::Plugin(lash_core::PluginError::Session(
40 "trigger store is unavailable in this runtime".to_string(),
41 ))
42 })?;
43 let drivers = self.core.work_driver.drivers().await;
44 let router = lash_core::TriggerRouter::new(
45 Arc::clone(store),
46 self.core.env.process_registry.clone(),
47 drivers.process,
48 );
49 router
50 .emit(request, scoped_effect_controller.controller())
51 .await
52 .map_err(Into::into)
53 }
54
55 pub async fn subscriptions(
56 &self,
57 filter: lash_core::TriggerSubscriptionFilter,
58 ) -> Result<Vec<lash_core::TriggerRegistration>> {
59 let store = self.core.env.trigger_store.as_ref().ok_or_else(|| {
60 EmbedError::Plugin(lash_core::PluginError::Session(
61 "trigger store is unavailable in this runtime".to_string(),
62 ))
63 })?;
64 let records = store.list_subscriptions(filter).await?;
65 Ok(records
66 .iter()
67 .map(lash_core::TriggerRegistration::from)
68 .collect())
69 }
70}
71
72#[derive(Clone)]
73pub struct SessionAdmin {
74 pub(crate) runtime: RuntimeHandle,
75}
76
77impl SessionAdmin {
78 pub fn config(&self) -> SessionConfigAdmin {
79 SessionConfigAdmin {
80 control: self.clone(),
81 }
82 }
83
84 pub fn tools(&self) -> ToolAdmin {
85 ToolAdmin {
86 control: self.clone(),
87 }
88 }
89
90 pub fn commands(&self) -> SessionCommandAdmin {
91 SessionCommandAdmin {
92 control: self.clone(),
93 }
94 }
95
96 pub fn triggers(&self) -> SessionTriggerAdmin {
97 SessionTriggerAdmin {
98 control: self.clone(),
99 }
100 }
101
102 pub fn state(&self) -> SessionStateAdmin {
103 SessionStateAdmin {
104 control: self.clone(),
105 }
106 }
107
108 pub fn children(&self) -> ChildSessionAdmin {
109 ChildSessionAdmin {
110 control: self.clone(),
111 }
112 }
113
114 pub fn injection(&self) -> InjectionAdmin {
115 InjectionAdmin {
116 control: self.clone(),
117 }
118 }
119
120 pub fn protocol(&self) -> ProtocolAdmin {
121 ProtocolAdmin {
122 control: self.clone(),
123 }
124 }
125
126 pub fn processes(&self) -> SessionProcessAdmin {
127 SessionProcessAdmin {
128 control: self.clone(),
129 }
130 }
131
132 async fn with_writer<F, T>(&self, f: F) -> T
137 where
138 F: AsyncFnOnce(&mut LashRuntime) -> T,
139 {
140 let writer = self.runtime.writer();
141 let mut runtime = writer.lock().await;
142 let value = f(&mut runtime).await;
143 self.runtime.publish_from(&runtime);
144 value
145 }
146
147 async fn update_config(&self, patch: SessionConfigPatch) -> Result<()> {
148 self.with_writer(async |runtime: &mut LashRuntime| {
149 runtime
150 .update_session_config(patch.provider, patch.model, patch.prompt)
151 .await;
152 })
153 .await;
154 Ok(())
155 }
156
157 async fn export_state(&self) -> lash_core::SessionSnapshot {
158 self.runtime.observe().read_view.to_snapshot()
159 }
160
161 async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
162 self.with_writer(async |runtime: &mut LashRuntime| {
163 runtime
164 .append_session_nodes(lash_core::AppendSessionNodesRequest {
165 nodes: messages
166 .into_iter()
167 .map(lash_core::SessionAppendNode::message)
168 .collect(),
169 requires_ancestor_node_id: None,
170 })
171 .await
172 .map(|_| ())
173 .map_err(Into::into)
174 })
175 .await
176 }
177
178 async fn append_plugin_body(
179 &self,
180 plugin_type: impl Into<String>,
181 body: serde_json::Value,
182 ) -> Result<()> {
183 self.with_writer(async |runtime: &mut LashRuntime| {
184 runtime
185 .append_session_nodes(lash_core::AppendSessionNodesRequest {
186 nodes: vec![lash_core::SessionAppendNode::plugin(plugin_type, body)],
187 requires_ancestor_node_id: None,
188 })
189 .await
190 .map(|_| ())
191 .map_err(Into::into)
192 })
193 .await
194 }
195
196 async fn set_persisted_state(&self, state: RuntimeSessionState) -> Result<()> {
197 self.with_writer(async |runtime: &mut LashRuntime| {
198 runtime.set_persisted_state(state).map_err(Into::into)
199 })
200 .await
201 }
202
203 async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
204 self.with_writer(async |runtime: &mut LashRuntime| {
205 runtime.set_prompt_template(template).await;
206 })
207 .await;
208 Ok(())
209 }
210
211 async fn clear_prompt_template(&self) -> Result<()> {
212 self.with_writer(async |runtime: &mut LashRuntime| {
213 runtime.clear_prompt_template().await;
214 })
215 .await;
216 Ok(())
217 }
218
219 async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
220 self.with_writer(async |runtime: &mut LashRuntime| {
221 runtime.add_prompt_contribution(contribution).await;
222 })
223 .await;
224 Ok(())
225 }
226
227 async fn replace_prompt_slot(
228 &self,
229 slot: PromptSlot,
230 contributions: impl IntoIterator<Item = PromptContribution>,
231 ) -> Result<()> {
232 self.with_writer(async |runtime: &mut LashRuntime| {
233 runtime.replace_prompt_slot(slot, contributions).await;
234 })
235 .await;
236 Ok(())
237 }
238
239 async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
240 self.with_writer(async |runtime: &mut LashRuntime| {
241 runtime.clear_prompt_slot(slot).await;
242 })
243 .await;
244 Ok(())
245 }
246
247 async fn apply_protocol_session_extension(
248 &self,
249 extension: lash_core::ProtocolSessionExtensionHandle,
250 ) -> Result<()> {
251 self.with_writer(async |runtime: &mut LashRuntime| {
252 runtime
253 .apply_protocol_session_extension(extension)
254 .await
255 .map_err(Into::into)
256 })
257 .await
258 }
259
260 async fn branch_to_node(
261 &self,
262 target_leaf: Option<String>,
263 ) -> Result<lash_core::SessionSnapshot> {
264 self.with_writer(async |runtime: &mut LashRuntime| {
265 runtime
266 .branch_to_node(target_leaf)
267 .await
268 .map_err(Into::into)
269 })
270 .await
271 }
272
273 pub(crate) async fn refresh_background_graph(&self) -> Result<()> {
279 self.with_writer(async |runtime: &mut LashRuntime| {
280 runtime.await_background_work().await.map_err(Into::into)
281 })
282 .await
283 }
284
285 fn process_registry(&self) -> Result<Arc<dyn lash_core::ProcessRegistry>> {
286 self.runtime
287 .observe()
288 .process_registry
289 .clone()
290 .ok_or_else(|| {
291 EmbedError::Plugin(lash_core::PluginError::Session(
292 "process registry is unavailable in this runtime".to_string(),
293 ))
294 })
295 }
296
297 fn process_observer(&self) -> Result<lash_core::ProcessWorkObserver> {
298 Ok(lash_core::ProcessWorkObserver::new(
299 self.process_registry()?,
300 ))
301 }
302
303 fn process_observer_opt(&self) -> Option<lash_core::ProcessWorkObserver> {
308 self.runtime
309 .observe()
310 .process_registry
311 .clone()
312 .map(lash_core::ProcessWorkObserver::new)
313 }
314
315 fn process_visible_scopes(&self) -> Vec<lash_core::SessionScope> {
320 let observation = self.runtime.observe();
321 let root = observation.process_scope();
322 let mut scopes = vec![root.clone()];
323 let frame_id = observation.persisted_state.current_agent_frame_id.as_str();
324 if !frame_id.is_empty() {
325 let frame_scope =
326 lash_core::SessionScope::for_agent_frame(observation.session_id(), frame_id);
327 if frame_scope.id() != root.id() {
328 scopes.push(frame_scope);
329 }
330 }
331 scopes
332 }
333
334 async fn signal_process(
335 &self,
336 process_id: &str,
337 signal_name: String,
338 signal_id: String,
339 payload: serde_json::Value,
340 scoped_effect_controller: ScopedEffectController<'_>,
341 ) -> Result<lash_core::ProcessEvent> {
342 let writer = self.runtime.writer();
343 let runtime = writer.lock().await;
344 let session_id = runtime.session_id().to_string();
345 let processes = runtime.process_service()?;
346 let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
347 processes
348 .signal(
349 &session_id,
350 process_id,
351 signal_name,
352 signal_id,
353 payload,
354 scope,
355 )
356 .await
357 .map_err(EmbedError::Plugin)
358 }
359
360 async fn transfer_process_handles(
361 &self,
362 to_session_id: &str,
363 process_ids: Vec<String>,
364 scoped_effect_controller: ScopedEffectController<'_>,
365 ) -> Result<()> {
366 let writer = self.runtime.writer();
367 let runtime = writer.lock().await;
368 let session_id = runtime.session_id().to_string();
369 let processes = runtime.process_service()?;
370 let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
371 processes
372 .transfer(&session_id, to_session_id, process_ids, scope)
373 .await
374 .map_err(EmbedError::Plugin)
375 }
376
377 async fn await_process_output(
378 &self,
379 process_id: &str,
380 ) -> Result<lash_core::ProcessAwaitOutput> {
381 lash_core::ProcessAwaiter::polling(self.process_registry()?)
382 .await_terminal(process_id)
383 .await
384 .map_err(Into::into)
385 }
386
387 async fn request_process_abandon(
388 &self,
389 process_id: &str,
390 reason: Option<String>,
391 ) -> Result<lash_core::ObservedProcess> {
392 let session_id = self.runtime.observe().session_id().to_string();
393 let request = lash_core::AbandonRequest {
394 requested_by: format!("session:{session_id}"),
395 requested_at_ms: crate::process_admin::now_epoch_ms(),
396 reason,
397 };
398 self.process_registry()?
399 .request_process_abandon(process_id, request)
400 .await?;
401 self.process_observer()?
402 .process(process_id)
403 .await
404 .ok_or_else(|| {
405 EmbedError::Plugin(lash_core::PluginError::Session(format!(
406 "process `{process_id}` vanished after recording its abandon request"
407 )))
408 })
409 }
410
411 async fn refresh_tool_catalog(&self) -> Result<()> {
412 self.with_writer(async |runtime: &mut LashRuntime| {
413 runtime
414 .refresh_session_tool_catalog()
415 .await
416 .map_err(Into::into)
417 })
418 .await
419 }
420
421 async fn submit_session_command(
422 &self,
423 command: lash_core::SessionCommand,
424 idempotency_key: impl Into<String>,
425 ) -> Result<lash_core::SessionCommandReceipt> {
426 let idempotency_key = idempotency_key.into();
427 self.with_writer(async |runtime: &mut LashRuntime| {
428 runtime
429 .submit_session_command(command, idempotency_key)
430 .await
431 .map_err(Into::into)
432 })
433 .await
434 }
435
436 async fn list_trigger_registrations(&self) -> Result<Vec<lash_core::TriggerRegistration>> {
437 self.with_writer(async |runtime: &mut LashRuntime| {
438 runtime
439 .list_trigger_registrations()
440 .await
441 .map_err(Into::into)
442 })
443 .await
444 }
445
446 async fn trigger_registrations_by_source_type(
447 &self,
448 source_type: impl Into<lash_core::TriggerEventType>,
449 ) -> Result<Vec<lash_core::TriggerRegistration>> {
450 self.with_writer(async |runtime: &mut LashRuntime| {
451 runtime
452 .trigger_registrations_by_source_type(source_type)
453 .await
454 .map_err(Into::into)
455 })
456 .await
457 }
458
459 async fn query_plugin_raw(
460 &self,
461 name: &str,
462 args: serde_json::Value,
463 ) -> Result<(String, serde_json::Value)> {
464 let observation = self.runtime.observe();
465 let session_id = observation.session_id().to_string();
466 observation
467 .query_plugin(name, args, Some(session_id))
468 .await
469 .map_err(Into::into)
470 }
471
472 async fn run_plugin_command_raw(
473 &self,
474 name: &str,
475 args: serde_json::Value,
476 ) -> Result<lash_core::PluginCommandReceipt<serde_json::Value>> {
477 let session_id = self.runtime.observe().session_id().to_string();
478 let writer = self.runtime.writer();
479 let mut runtime = writer.lock().await;
480 let receipt = runtime
481 .run_plugin_command(name, args, Some(session_id))
482 .await?;
483 self.record_plugin_operation_observations(&receipt.events, &receipt.pending_turn_inputs);
484 self.runtime.publish_from(&runtime);
485 Ok(receipt)
486 }
487
488 async fn run_plugin_task_raw_with_cancel(
489 &self,
490 name: &str,
491 args: serde_json::Value,
492 cancellation_token: CancellationToken,
493 ) -> Result<lash_core::PluginTaskReceipt<serde_json::Value>> {
494 let session_id = self.runtime.observe().session_id().to_string();
495 let writer = self.runtime.writer();
496 let mut runtime = writer.lock().await;
497 let scope_id = format!(
498 "{session_id}:plugin_task:{name}:{}",
499 lash_core::TurnActivityId::fresh().0
500 );
501 let scoped_effect_controller = runtime
502 .effect_host()
503 .scoped_static(lash_core::ExecutionScope::runtime_operation(scope_id))
504 .map_err(EmbedError::Runtime)?
505 .ok_or_else(|| {
506 EmbedError::Plugin(lash_core::PluginError::Session(
507 "plugin task execution requires an effect host that can create a static runtime-operation scope".to_string(),
508 ))
509 })?;
510 let receipt = runtime
511 .run_plugin_task(
512 name,
513 args,
514 Some(session_id),
515 scoped_effect_controller,
516 cancellation_token,
517 )
518 .await?;
519 self.record_plugin_operation_observations(&receipt.events, &receipt.pending_turn_inputs);
520 self.runtime.publish_from(&runtime);
521 Ok(receipt)
522 }
523
524 fn record_plugin_operation_observations(
525 &self,
526 events: &[lash_core::PluginOwned<lash_core::PluginRuntimeEvent>],
527 pending_turn_inputs: &[lash_core::PendingTurnInput],
528 ) {
529 for owned in events {
530 self.runtime
531 .record_turn_activity(lash_core::TurnActivity::independent(
532 lash_core::TurnEvent::PluginRuntime {
533 plugin_id: owned.plugin_id.clone(),
534 event: owned.value.clone(),
535 },
536 ));
537 }
538 if !pending_turn_inputs.is_empty() {
539 self.runtime.record_queue_changed(
540 lash_core::SessionQueueEventKind::Enqueued,
541 pending_turn_inputs
542 .iter()
543 .map(|input| input.input_id.clone())
544 .collect(),
545 );
546 }
547 }
548
549 async fn compact_context(
550 &self,
551 instructions: Option<String>,
552 scoped_effect_controller: ScopedEffectController<'_>,
553 ) -> Result<bool> {
554 self.with_writer(async |runtime: &mut LashRuntime| {
555 runtime
556 .compact_context(instructions, scoped_effect_controller)
557 .await
558 .map_err(Into::into)
559 })
560 .await
561 }
562
563 async fn persist_current_state(&self) -> Result<RuntimeSessionState> {
564 self.with_writer(async |runtime: &mut LashRuntime| {
565 runtime.await_background_work().await?;
566 Ok(runtime.export_persisted_state())
567 })
568 .await
569 }
570
571 async fn start_process(
572 &self,
573 request: lash_core::ProcessStartRequest,
574 scoped_effect_controller: ScopedEffectController<'_>,
575 ) -> Result<lash_core::ProcessHandleSummary> {
576 let writer = self.runtime.writer();
577 let runtime = writer.lock().await;
578 let session_id = runtime.session_id().to_string();
579 let processes = runtime.process_service()?;
580 let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
581 let summary = processes
582 .start_from_request(&session_id, request, scope)
583 .await
584 .map_err(EmbedError::Plugin)?;
585 self.runtime.record_process_changed(
586 SessionProcessEventKind::Started,
587 vec![summary.process_id.clone()],
588 );
589 Ok(summary)
590 }
591
592 async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
593 self.runtime
594 .writer()
595 .lock()
596 .await
597 .session_state_service()
598 .map_err(Into::into)
599 }
600
601 async fn cancel_process(
602 &self,
603 process_id: &str,
604 scoped_effect_controller: ScopedEffectController<'_>,
605 ) -> Result<lash_core::ProcessCancelSummary> {
606 let writer = self.runtime.writer();
607 let runtime = writer.lock().await;
608 let session_id = runtime.session_id().to_string();
609 let processes = runtime.process_service()?;
610 let cancel_ability = runtime.process_cancel_ability();
611 let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
612 let summary = cancel_ability
613 .cancel_summary(
614 processes.as_ref(),
615 lash_core::ProcessCancelRequest::new(
616 &session_id,
617 process_id,
618 scope,
619 lash_core::ProcessCancelSource::HostApi,
620 )
621 .with_reason("requested by host API"),
622 )
623 .await
624 .map_err(EmbedError::Plugin)?;
625 self.runtime.record_process_changed(
626 SessionProcessEventKind::Cancelled,
627 vec![summary.process_id.clone()],
628 );
629 Ok(summary)
630 }
631
632 async fn cancel_visible_processes(
633 &self,
634 scoped_effect_controller: ScopedEffectController<'_>,
635 ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
636 let writer = self.runtime.writer();
637 let runtime = writer.lock().await;
638 let session_id = runtime.session_id().to_string();
639 let processes = runtime.process_service()?;
640 let cancel_ability = runtime.process_cancel_ability();
641 let scope = lash_core::ProcessOpScope::new(scoped_effect_controller);
642 let summaries = cancel_ability
643 .cancel_all_visible(
644 processes.as_ref(),
645 lash_core::ProcessCancelAllRequest::new(
646 &session_id,
647 scope,
648 lash_core::ProcessCancelSource::HostApi,
649 )
650 .with_reason("requested by host API"),
651 )
652 .await
653 .map_err(EmbedError::Plugin)?;
654 self.runtime.record_process_changed(
655 SessionProcessEventKind::Cancelled,
656 summaries
657 .iter()
658 .map(|summary| summary.process_id.clone())
659 .collect(),
660 );
661 Ok(summaries)
662 }
663
664 async fn snapshot_execution_state(&self) -> Result<Option<Vec<u8>>> {
665 self.with_writer(async |runtime: &mut LashRuntime| {
666 runtime.snapshot_execution_state().await.map_err(Into::into)
667 })
668 .await
669 }
670
671 async fn restore_execution_state(&self, bytes: &[u8]) -> Result<()> {
672 self.with_writer(async |runtime: &mut LashRuntime| {
673 runtime
674 .restore_execution_state(bytes)
675 .await
676 .map_err(Into::into)
677 })
678 .await
679 }
680
681 async fn tool_state(&self) -> Result<ToolState> {
682 self.runtime.observe().tool_state.clone().ok_or_else(|| {
683 EmbedError::Session(SessionError::Protocol(
684 "runtime session not available".to_string(),
685 ))
686 })
687 }
688
689 async fn apply_tool_state(&self, state: ToolState) -> Result<u64> {
690 self.with_writer(async |runtime: &mut LashRuntime| {
691 runtime
692 .apply_tool_state(state)
693 .await
694 .map_err(EmbedError::from)
695 })
696 .await
697 }
698
699 async fn restore_tool_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
700 self.with_writer(async |runtime: &mut LashRuntime| {
701 runtime
702 .restore_tool_state(state)
703 .await
704 .map_err(EmbedError::from)
705 })
706 .await
707 }
708
709 async fn set_tool_membership(&self, tool_id: lash_core::ToolId, present: bool) -> Result<u64> {
710 self.set_tool_membership_many(&[(tool_id, present)]).await
711 }
712
713 async fn set_tool_membership_many(&self, updates: &[(lash_core::ToolId, bool)]) -> Result<u64> {
714 let mut state = self.tool_state().await?;
715 for (tool_id, present) in updates {
716 state
717 .set_membership(tool_id, *present)
718 .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
719 }
720 self.apply_tool_state(state).await
721 }
722
723 async fn active_tool_manifests(&self) -> Result<Vec<ToolManifest>> {
724 Ok(self.tool_state().await?.tool_manifests())
725 }
726
727 async fn add_tool_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
728 let tool_registry = self.tool_registry().await?;
729 let handle = tool_registry
730 .add_tool_provider(provider)
731 .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
732 self.refresh_tool_catalog().await?;
733 Ok(handle)
734 }
735
736 async fn remove_tool_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
737 let tool_registry = self.tool_registry().await?;
738 let generation = tool_registry
739 .remove_source(handle)
740 .map_err(|err| EmbedError::Session(SessionError::Protocol(err.to_string())))?;
741 self.refresh_tool_catalog().await?;
742 Ok(generation)
743 }
744
745 async fn create_child_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
746 let writer = self.runtime.writer();
747 let runtime = writer.lock().await;
748 let lifecycle = runtime.session_lifecycle_service()?;
749 lifecycle.create_session(request).await.map_err(Into::into)
750 }
751
752 async fn close_child_session(&self, session_id: &str) -> Result<()> {
753 let writer = self.runtime.writer();
754 let runtime = writer.lock().await;
755 let lifecycle = runtime.session_lifecycle_service()?;
756 lifecycle
757 .close_session(session_id)
758 .await
759 .map_err(Into::into)
760 }
761
762 async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
763 self.with_writer(async |runtime: &mut LashRuntime| {
764 runtime
765 .activate_managed_session(session_id)
766 .await
767 .map_err(Into::into)
768 })
769 .await
770 }
771
772 async fn inject_turn_input(
773 &self,
774 turn_id: &str,
775 id: Option<String>,
776 message: PluginMessage,
777 ) -> Result<()> {
778 self.inject_turn_inputs_for_turn(
779 turn_id,
780 vec![lash_core::InjectedTurnInput { id, message }],
781 )
782 .await
783 }
784
785 async fn inject_turn_inputs_for_turn(
786 &self,
787 turn_id: &str,
788 messages: Vec<lash_core::InjectedTurnInput>,
789 ) -> Result<()> {
790 for input in messages {
791 let source_key = input.id.map(|id| format!("injection:{id}"));
792 let turn_input = turn_input_from_plugin_message(input.message);
793 self.runtime
794 .enqueue_turn_input(
795 turn_input,
796 lash_core::TurnInputIngress::active_turn(
797 turn_id,
798 lash_core::TurnInputCheckpointBoundary::AfterWork,
799 ),
800 source_key,
801 )
802 .await
803 .map(|_| ())
804 .map_err(EmbedError::Runtime)?;
805 }
806 Ok(())
807 }
808
809 async fn tool_registry(&self) -> Result<Arc<lash_core::ToolRegistry>> {
810 self.runtime
811 .writer()
812 .lock()
813 .await
814 .plugin_session()
815 .map(|session| session.tool_registry())
816 .ok_or_else(|| {
817 EmbedError::Session(SessionError::Protocol(
818 "tool registry is unavailable in this runtime session".to_string(),
819 ))
820 })
821 }
822}
823
824fn turn_input_from_plugin_message(message: PluginMessage) -> TurnInput {
825 let mut input = TurnInput::empty();
826 if !message.content.is_empty() {
827 input.items.push(InputItem::Text {
828 text: message.content,
829 });
830 }
831 for (index, bytes) in message.images.into_iter().enumerate() {
832 let id = format!("injected-image-{index}");
833 input.items.push(InputItem::ImageRef { id: id.clone() });
834 input.image_blobs.insert(id, bytes);
835 }
836 input
837}
838
839#[derive(Clone)]
840pub struct SessionConfigAdmin {
841 control: SessionAdmin,
842}
843
844impl SessionConfigAdmin {
845 pub async fn update(&self, patch: SessionConfigPatch) -> Result<()> {
846 self.control.update_config(patch).await
847 }
848
849 pub async fn set_prompt_template(&self, template: PromptTemplate) -> Result<()> {
850 self.control.set_prompt_template(template).await
851 }
852
853 pub async fn clear_prompt_template(&self) -> Result<()> {
854 self.control.clear_prompt_template().await
855 }
856
857 pub async fn add_prompt_contribution(&self, contribution: PromptContribution) -> Result<()> {
858 self.control.add_prompt_contribution(contribution).await
859 }
860
861 pub async fn replace_prompt_slot(
862 &self,
863 slot: PromptSlot,
864 contributions: impl IntoIterator<Item = PromptContribution>,
865 ) -> Result<()> {
866 self.control.replace_prompt_slot(slot, contributions).await
867 }
868
869 pub async fn clear_prompt_slot(&self, slot: PromptSlot) -> Result<()> {
870 self.control.clear_prompt_slot(slot).await
871 }
872}
873
874#[derive(Clone)]
875pub struct ToolAdmin {
876 control: SessionAdmin,
877}
878
879impl ToolAdmin {
880 pub(crate) fn new(control: SessionAdmin) -> Self {
881 Self { control }
882 }
883}
884
885impl ToolAdmin {
886 pub async fn state(&self) -> Result<ToolState> {
887 self.control.tool_state().await
888 }
889
890 pub fn advanced(&self) -> AdvancedToolAdmin {
891 AdvancedToolAdmin {
892 control: self.control.clone(),
893 }
894 }
895
896 pub async fn set_membership(
899 &self,
900 tool_id: impl Into<lash_core::ToolId>,
901 present: bool,
902 ) -> Result<u64> {
903 self.control
904 .set_tool_membership(tool_id.into(), present)
905 .await
906 }
907
908 pub async fn set_membership_many(&self, updates: &[(lash_core::ToolId, bool)]) -> Result<u64> {
909 self.control.set_tool_membership_many(updates).await
910 }
911
912 pub async fn active_manifests(&self) -> Result<Vec<ToolManifest>> {
913 self.control.active_tool_manifests().await
914 }
915
916 pub async fn add_provider(&self, provider: Arc<dyn ToolProvider>) -> Result<ToolSourceHandle> {
917 self.control.add_tool_provider(provider).await
918 }
919
920 pub async fn remove_source(&self, handle: &ToolSourceHandle) -> Result<u64> {
921 self.control.remove_tool_source(handle).await
922 }
923}
924
925#[derive(Clone)]
926pub struct AdvancedToolAdmin {
927 control: SessionAdmin,
928}
929
930impl AdvancedToolAdmin {
931 pub async fn apply_state(&self, state: ToolState) -> Result<u64> {
937 self.control.apply_tool_state(state).await
938 }
939
940 pub async fn restore_state(&self, state: ToolState) -> Result<ToolRestoreReport> {
953 self.control.restore_tool_state(state).await
954 }
955}
956
957#[derive(Clone)]
958pub struct SessionCommandAdmin {
959 control: SessionAdmin,
960}
961
962impl SessionCommandAdmin {
963 pub async fn refresh_tool_catalog(
968 &self,
969 reason: impl Into<String>,
970 idempotency_key: impl Into<String>,
971 ) -> Result<lash_core::SessionCommandReceipt> {
972 self.control
973 .submit_session_command(
974 lash_core::SessionCommand::RefreshToolCatalog {
975 reason: reason.into(),
976 },
977 idempotency_key,
978 )
979 .await
980 }
981
982 pub async fn reset(
983 &self,
984 reason: impl Into<String>,
985 idempotency_key: impl Into<String>,
986 ) -> Result<lash_core::SessionCommandReceipt> {
987 self.control
988 .submit_session_command(
989 lash_core::SessionCommand::ResetSession {
990 reason: reason.into(),
991 },
992 idempotency_key,
993 )
994 .await
995 }
996}
997
998#[derive(Clone)]
1000pub struct SessionTriggerAdmin {
1001 control: SessionAdmin,
1002}
1003
1004impl SessionTriggerAdmin {
1005 pub async fn list_all(&self) -> Result<Vec<lash_core::TriggerRegistration>> {
1011 self.control.list_trigger_registrations().await
1012 }
1013
1014 pub async fn by_source_type(
1019 &self,
1020 source_type: impl Into<lash_core::TriggerEventType>,
1021 ) -> Result<Vec<lash_core::TriggerRegistration>> {
1022 self.control
1023 .trigger_registrations_by_source_type(source_type)
1024 .await
1025 }
1026}
1027
1028#[derive(Clone)]
1029pub struct SessionProcessAdmin {
1030 control: SessionAdmin,
1031}
1032
1033impl SessionProcessAdmin {
1043 pub(crate) fn new(control: SessionAdmin) -> Self {
1044 Self { control }
1045 }
1046
1047 async fn list_granted(
1051 &self,
1052 filter: &lash_core::ProcessListFilter,
1053 ) -> Result<Vec<lash_core::ObservedProcess>> {
1054 let Some(observer) = self.control.process_observer_opt() else {
1057 return Ok(Vec::new());
1058 };
1059 let mut seen = std::collections::BTreeSet::new();
1060 let mut out = Vec::new();
1061 for scope in self.control.process_visible_scopes() {
1062 for process in observer.list_granted_to(&scope, filter).await? {
1063 if seen.insert(process.process_id.clone()) {
1064 out.push(process);
1065 }
1066 }
1067 }
1068 Ok(out)
1069 }
1070
1071 pub async fn start(
1072 &self,
1073 request: lash_core::ProcessStartRequest,
1074 scoped_effect_controller: ScopedEffectController<'_>,
1075 ) -> Result<lash_core::ProcessHandleSummary> {
1076 self.control
1077 .start_process(request, scoped_effect_controller)
1078 .await
1079 }
1080
1081 pub async fn list(&self) -> Result<Vec<lash_core::ObservedProcess>> {
1083 self.list_granted(&lash_core::ProcessListFilter {
1084 status: lash_core::ProcessStatusFilter::Running,
1085 ..lash_core::ProcessListFilter::default()
1086 })
1087 .await
1088 }
1089
1090 pub async fn list_all(&self) -> Result<Vec<lash_core::ObservedProcess>> {
1092 self.list_granted(&lash_core::ProcessListFilter {
1093 status: lash_core::ProcessStatusFilter::Any,
1094 ..lash_core::ProcessListFilter::default()
1095 })
1096 .await
1097 }
1098
1099 pub async fn get(&self, process_id: &str) -> Result<Option<lash_core::ObservedProcess>> {
1101 Ok(self
1102 .list_all()
1103 .await?
1104 .into_iter()
1105 .find(|process| process.process_id == process_id))
1106 }
1107
1108 pub async fn events(
1109 &self,
1110 process_id: &str,
1111 after_sequence: u64,
1112 ) -> Result<Vec<lash_core::ObservedProcessEvent>> {
1113 let Some(observer) = self.control.process_observer_opt() else {
1114 return Ok(Vec::new());
1115 };
1116 observer
1117 .events_after(process_id, after_sequence)
1118 .await
1119 .map_err(Into::into)
1120 }
1121
1122 pub async fn await_output(&self, process_id: &str) -> Result<lash_core::ProcessAwaitOutput> {
1123 self.control.await_process_output(process_id).await
1124 }
1125
1126 pub async fn signal(
1127 &self,
1128 process_id: &str,
1129 signal_name: impl Into<String>,
1130 signal_id: impl Into<String>,
1131 payload: serde_json::Value,
1132 scoped_effect_controller: ScopedEffectController<'_>,
1133 ) -> Result<lash_core::ProcessEvent> {
1134 self.control
1135 .signal_process(
1136 process_id,
1137 signal_name.into(),
1138 signal_id.into(),
1139 payload,
1140 scoped_effect_controller,
1141 )
1142 .await
1143 }
1144
1145 pub async fn cancel(
1146 &self,
1147 process_id: &str,
1148 scoped_effect_controller: ScopedEffectController<'_>,
1149 ) -> Result<lash_core::ProcessCancelSummary> {
1150 self.control
1151 .cancel_process(process_id, scoped_effect_controller)
1152 .await
1153 }
1154
1155 pub async fn cancel_all(
1156 &self,
1157 scoped_effect_controller: ScopedEffectController<'_>,
1158 ) -> Result<Vec<lash_core::ProcessCancelSummary>> {
1159 self.control
1160 .cancel_visible_processes(scoped_effect_controller)
1161 .await
1162 }
1163
1164 pub async fn transfer(
1167 &self,
1168 to_session_id: &str,
1169 process_ids: Vec<String>,
1170 scoped_effect_controller: ScopedEffectController<'_>,
1171 ) -> Result<()> {
1172 self.control
1173 .transfer_process_handles(to_session_id, process_ids, scoped_effect_controller)
1174 .await
1175 }
1176
1177 pub async fn request_abandon(
1181 &self,
1182 process_id: &str,
1183 reason: Option<String>,
1184 ) -> Result<lash_core::ObservedProcess> {
1185 self.control
1186 .request_process_abandon(process_id, reason)
1187 .await
1188 }
1189}
1190
1191#[derive(Clone)]
1192pub struct SessionStateAdmin {
1193 control: SessionAdmin,
1194}
1195
1196impl SessionStateAdmin {
1197 pub async fn export(&self) -> lash_core::SessionSnapshot {
1198 self.control.export_state().await
1199 }
1200
1201 pub async fn append_messages(&self, messages: Vec<PluginMessage>) -> Result<()> {
1202 self.control.append_messages(messages).await
1203 }
1204
1205 pub async fn append_plugin_body(
1206 &self,
1207 plugin_type: impl Into<String>,
1208 body: serde_json::Value,
1209 ) -> Result<()> {
1210 self.control.append_plugin_body(plugin_type, body).await
1211 }
1212
1213 pub async fn set_persisted(&self, state: RuntimeSessionState) -> Result<()> {
1214 self.control.set_persisted_state(state).await
1215 }
1216
1217 pub async fn branch_to_node(
1218 &self,
1219 target_leaf: Option<String>,
1220 ) -> Result<lash_core::SessionSnapshot> {
1221 self.control.branch_to_node(target_leaf).await
1222 }
1223
1224 pub async fn persist_current(&self) -> Result<RuntimeSessionState> {
1225 self.control.persist_current_state().await
1226 }
1227
1228 pub async fn session_state_service(&self) -> Result<Arc<dyn SessionStateService>> {
1229 self.control.session_state_service().await
1230 }
1231
1232 pub async fn snapshot_execution(&self) -> Result<Option<Vec<u8>>> {
1233 self.control.snapshot_execution_state().await
1234 }
1235
1236 pub async fn restore_execution(&self, bytes: &[u8]) -> Result<()> {
1237 self.control.restore_execution_state(bytes).await
1238 }
1239
1240 pub async fn compact_context(
1241 &self,
1242 instructions: Option<String>,
1243 scoped_effect_controller: ScopedEffectController<'_>,
1244 ) -> Result<bool> {
1245 self.control
1246 .compact_context(instructions, scoped_effect_controller)
1247 .await
1248 }
1249}
1250
1251#[derive(Clone)]
1252pub struct PluginOperations {
1253 pub(crate) control: SessionAdmin,
1254}
1255
1256impl PluginOperations {
1257 pub async fn query<Op: lash_core::PluginQuery>(&self, args: Op::Args) -> Result<Op::Output> {
1258 let (_plugin_id, output) = self
1259 .control
1260 .query_plugin_raw(Op::NAME, encode_plugin_args::<Op>(args)?)
1261 .await?;
1262 decode_plugin_output::<Op>(output)
1263 }
1264
1265 pub async fn query_raw(
1266 &self,
1267 name: &str,
1268 args: serde_json::Value,
1269 ) -> Result<(String, serde_json::Value)> {
1270 self.control.query_plugin_raw(name, args).await
1271 }
1272
1273 pub async fn run_command<Op: lash_core::PluginCommand>(
1274 &self,
1275 args: Op::Args,
1276 ) -> Result<lash_core::PluginCommandReceipt<Op::Output>> {
1277 let receipt = self
1278 .control
1279 .run_plugin_command_raw(Op::NAME, encode_plugin_args::<Op>(args)?)
1280 .await?;
1281 Ok(lash_core::PluginCommandReceipt {
1282 output: decode_plugin_output::<Op>(receipt.output)?,
1283 events: receipt.events,
1284 pending_turn_inputs: receipt.pending_turn_inputs,
1285 })
1286 }
1287
1288 pub async fn run_command_raw(
1289 &self,
1290 name: &str,
1291 args: serde_json::Value,
1292 ) -> Result<lash_core::PluginCommandReceipt<serde_json::Value>> {
1293 self.control.run_plugin_command_raw(name, args).await
1294 }
1295
1296 pub async fn run_task<Op: lash_core::PluginTask>(
1297 &self,
1298 args: Op::Args,
1299 ) -> Result<lash_core::PluginTaskReceipt<Op::Output>> {
1300 self.run_task_with_cancel::<Op>(args, CancellationToken::new())
1301 .await
1302 }
1303
1304 pub async fn run_task_with_cancel<Op: lash_core::PluginTask>(
1305 &self,
1306 args: Op::Args,
1307 cancellation_token: CancellationToken,
1308 ) -> Result<lash_core::PluginTaskReceipt<Op::Output>> {
1309 let receipt = self
1310 .control
1311 .run_plugin_task_raw_with_cancel(
1312 Op::NAME,
1313 encode_plugin_args::<Op>(args)?,
1314 cancellation_token,
1315 )
1316 .await?;
1317 Ok(lash_core::PluginTaskReceipt {
1318 output: decode_plugin_output::<Op>(receipt.output)?,
1319 events: receipt.events,
1320 pending_turn_inputs: receipt.pending_turn_inputs,
1321 })
1322 }
1323
1324 pub async fn run_task_raw(
1325 &self,
1326 name: &str,
1327 args: serde_json::Value,
1328 ) -> Result<lash_core::PluginTaskReceipt<serde_json::Value>> {
1329 self.run_task_raw_with_cancel(name, args, CancellationToken::new())
1330 .await
1331 }
1332
1333 pub async fn run_task_raw_with_cancel(
1334 &self,
1335 name: &str,
1336 args: serde_json::Value,
1337 cancellation_token: CancellationToken,
1338 ) -> Result<lash_core::PluginTaskReceipt<serde_json::Value>> {
1339 self.control
1340 .run_plugin_task_raw_with_cancel(name, args, cancellation_token)
1341 .await
1342 }
1343}
1344
1345fn encode_plugin_args<Op: lash_core::PluginOperation>(args: Op::Args) -> Result<serde_json::Value> {
1346 serde_json::to_value(args).map_err(|err| {
1347 EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
1348 "invalid {} args: {err}",
1349 Op::NAME
1350 )))
1351 })
1352}
1353
1354fn decode_plugin_output<Op: lash_core::PluginOperation>(
1355 output: serde_json::Value,
1356) -> Result<Op::Output> {
1357 serde_json::from_value(output).map_err(|err| {
1358 EmbedError::Plugin(lash_core::PluginError::Invoke(format!(
1359 "invalid {} output: {err}",
1360 Op::NAME
1361 )))
1362 })
1363}
1364
1365#[derive(Clone)]
1366pub struct ChildSessionAdmin {
1367 control: SessionAdmin,
1368}
1369
1370impl ChildSessionAdmin {
1371 pub async fn create_session(&self, request: SessionCreateRequest) -> Result<SessionHandle> {
1372 self.control.create_child_session(request).await
1373 }
1374
1375 pub async fn close_session(&self, session_id: &str) -> Result<()> {
1376 self.control.close_child_session(session_id).await
1377 }
1378
1379 pub async fn activate_managed_session(&self, session_id: &str) -> Result<()> {
1380 self.control.activate_managed_session(session_id).await
1381 }
1382}
1383
1384#[derive(Clone)]
1385pub struct InjectionAdmin {
1386 control: SessionAdmin,
1387}
1388
1389impl InjectionAdmin {
1390 pub async fn inject_turn_input(
1391 &self,
1392 turn_id: &str,
1393 id: Option<String>,
1394 message: PluginMessage,
1395 ) -> Result<()> {
1396 self.control.inject_turn_input(turn_id, id, message).await
1397 }
1398
1399 pub async fn inject_turn_inputs_for_turn(
1400 &self,
1401 turn_id: &str,
1402 messages: Vec<lash_core::InjectedTurnInput>,
1403 ) -> Result<()> {
1404 self.control
1405 .inject_turn_inputs_for_turn(turn_id, messages)
1406 .await
1407 }
1408}
1409
1410#[derive(Clone)]
1411pub struct ProtocolAdmin {
1412 control: SessionAdmin,
1413}
1414
1415impl ProtocolAdmin {
1416 pub async fn apply_session_extension(
1417 &self,
1418 extension: lash_core::ProtocolSessionExtensionHandle,
1419 ) -> Result<()> {
1420 self.control
1421 .apply_protocol_session_extension(extension)
1422 .await
1423 }
1424}