1use super::*;
2
3impl LashRuntime {
4 pub fn set_runtime_lease_owner(&mut self, owner: crate::LeaseOwnerIdentity) {
11 self.runtime_lease_owner = owner;
12 }
13
14 pub fn unregister_plugin_session(&self) -> Result<(), crate::PluginError> {
15 if let Some(session) = self.session.as_ref() {
16 session
17 .plugins()
18 .host()
19 .unregister_session(&self.state.session_id)?;
20 }
21 Ok(())
22 }
23
24 pub(super) async fn from_host_state(
25 policy: SessionPolicy,
26 host: RuntimeHost,
27 services: RuntimeServices,
28 mut state: RuntimeSessionState,
29 ) -> Result<Self, SessionError> {
30 if state.session_id.is_empty() {
31 state.session_id = uuid::Uuid::new_v4().to_string();
32 }
33 let state_policy_was_unconfigured = state.policy.recorded_provider_id().is_empty()
39 && state.policy.model.id.trim().is_empty();
40 if state_policy_was_unconfigured {
41 state.policy = policy.clone();
42 }
43 state.ensure_agent_frame_initialized();
44 let state_policy = state.policy.clone();
45 if let Some(frame) = state.current_agent_frame_mut()
46 && frame.assignment.policy.recorded_provider_id().is_empty()
47 && frame.assignment.policy.model.id.trim().is_empty()
48 {
49 frame.assignment.policy = state_policy;
50 }
51 state.policy = state.effective_policy().clone();
52 state.protocol_turn_options = state.effective_protocol_turn_options().clone();
53 normalize_session_graph(&mut state);
54 let policy = state.effective_policy().clone();
55 if policy.model.id.trim().is_empty() {
56 return Err(SessionError::Protocol(
57 "session policy missing model spec; hosts must supply explicit model metadata"
58 .to_string(),
59 ));
60 }
61 let mut host = host;
62 if let Some(store) = services.store.clone() {
69 let manifest: Arc<dyn crate::AttachmentManifest> =
70 Arc::new(crate::attachments::PersistenceManifestAdapter(store));
71 let scoped: Arc<dyn crate::AttachmentStore> =
72 Arc::new(crate::SessionScopedAttachmentStore::new(
73 Arc::clone(&host.core.durability.attachment_store),
74 manifest,
75 state.session_id.clone(),
76 ));
77 host.core.durability.attachment_store = scoped;
78 }
79 let services = services
80 .with_attachment_store(Arc::clone(&host.core.durability.attachment_store))
81 .with_process_env_store(Arc::clone(&host.core.durability.process_env_store))
82 .with_clock(Arc::clone(&host.core.clock));
83 let mut session = Session::new(services.clone(), &state.session_id).await?;
84 if let Some(tool_state) = state.tool_state_snapshot.clone() {
85 let report = session
93 .plugins()
94 .tool_registry()
95 .restore_state(tool_state)
96 .map_err(|err| SessionError::Protocol(err.to_string()))?;
97 if !report.orphaned.is_empty() {
98 tracing::warn!(
99 session_id = %state.session_id,
100 orphaned = ?report.orphaned,
101 "session restored with orphaned tools: no registered source \
102 resolves them; they remain non-members until their source returns"
103 );
104 }
105 }
106 session.refresh_tool_catalog().await?;
107 if let Some(snapshot) = state.plugin_snapshot.clone() {
108 session
109 .plugins()
110 .restore(&snapshot)
111 .map_err(|err| SessionError::Protocol(err.to_string()))?;
112 }
113 let protocol_session = Arc::clone(session.plugins().protocol_session());
114 let session_id = state.session_id.clone();
115 protocol_session
116 .restore_session(
117 crate::plugin::ProtocolSessionContext::new(&mut session, &session_id),
118 &state,
119 )
120 .await?;
121 state.discard_runtime_snapshots();
122 session
123 .plugins()
124 .emit_runtime_event(crate::PluginLifecycleEvent::SessionRestored(
125 crate::SessionReadView::from_persisted_state(&state),
126 ))
127 .await;
128 let protocol_turn_options = state.protocol_turn_options.clone();
129 let runtime_scope_id = uuid::Uuid::new_v4().to_string();
130 let runtime_lease_owner = crate::LeaseOwnerIdentity::opaque(
131 runtime_scope_id.clone(),
132 uuid::Uuid::new_v4().to_string(),
133 );
134 Ok(Self {
135 session: Some(session),
136 policy,
137 host,
138 services,
139 state,
140 runtime_scope_id: Arc::<str>::from(runtime_scope_id),
141 runtime_lease_owner,
142 managed_sessions: Arc::new(Mutex::new(HashMap::new())),
143 managed_turns: Arc::new(Mutex::new(HashMap::new())),
144 protocol_turn_options,
145 shared_token_ledger: Arc::new(std::sync::Mutex::new(Vec::new())),
146 process_sync_needed: Arc::new(AtomicBool::new(false)),
147 turn_phase_probe: None,
148 residency: Residency::default(),
149 })
150 }
151
152 pub async fn from_embedded_state(
154 policy: SessionPolicy,
155 host: EmbeddedRuntimeHost,
156 services: RuntimeServices,
157 state: RuntimeSessionState,
158 ) -> Result<Self, SessionError> {
159 Self::from_host_state(policy, host.into(), services, state).await
160 }
161
162 pub async fn from_background_state(
164 policy: SessionPolicy,
165 host: ProcessRuntimeHost,
166 services: RuntimeServices,
167 state: RuntimeSessionState,
168 ) -> Result<Self, SessionError> {
169 Self::from_host_state(policy, host.into(), services, state).await
170 }
171
172 pub async fn from_persistent_embedded_state(
174 policy: SessionPolicy,
175 host: EmbeddedRuntimeHost,
176 services: PersistentRuntimeServices,
177 state: RuntimeSessionState,
178 ) -> Result<Self, SessionError> {
179 Self::from_host_state(policy, host.into(), services.into_runtime_services(), state).await
180 }
181
182 pub async fn from_persistent_background_state(
184 policy: SessionPolicy,
185 host: ProcessRuntimeHost,
186 services: PersistentRuntimeServices,
187 state: RuntimeSessionState,
188 ) -> Result<Self, SessionError> {
189 Self::from_host_state(policy, host.into(), services.into_runtime_services(), state).await
190 }
191
192 pub(crate) async fn assemble_runtime(
203 policy: SessionPolicy,
204 embedded_host: EmbeddedRuntimeHost,
205 plugin_session: Arc<crate::PluginSession>,
206 store: Option<Arc<dyn crate::store::RuntimePersistence>>,
207 process_registry: Option<Arc<dyn ProcessRegistry>>,
208 mut state: RuntimeSessionState,
209 residency: Residency,
210 ) -> Result<Self, SessionError> {
211 if matches!(residency, Residency::ActivePathOnly) && store.is_none() {
214 return Err(SessionError::Protocol(
215 "Residency::ActivePathOnly requires a persistent store — \
216 without one, trimmed orphans are irrecoverable"
217 .to_string(),
218 ));
219 }
220 normalize_session_graph(&mut state);
223 apply_residency_on_load(&mut state, residency);
224 let mut runtime = match (store, process_registry) {
225 (Some(store), Some(registry)) => {
226 let host = ProcessRuntimeHost::new(embedded_host, registry);
227 let services = PersistentRuntimeServices::new(plugin_session, store);
228 Self::from_persistent_background_state(policy, host, services, state).await?
229 }
230 (Some(store), None) => {
231 let services = PersistentRuntimeServices::new(plugin_session, store);
232 Self::from_persistent_embedded_state(policy, embedded_host, services, state).await?
233 }
234 (None, Some(registry)) => {
235 let host = ProcessRuntimeHost::new(embedded_host, registry);
236 let services = RuntimeServices::new(plugin_session);
237 Self::from_background_state(policy, host, services, state).await?
238 }
239 (None, None) => {
240 let services = RuntimeServices::new(plugin_session);
241 Self::from_embedded_state(policy, embedded_host, services, state).await?
242 }
243 };
244 runtime.residency = residency;
245 Ok(runtime)
246 }
247
248 pub async fn from_environment(
264 env: &RuntimeEnvironment,
265 policy: SessionPolicy,
266 state: RuntimeSessionState,
267 store: Option<Arc<dyn crate::store::RuntimePersistence>>,
268 ) -> Result<Self, SessionError> {
269 let plugin_host = env.plugin_host.as_ref().ok_or_else(|| {
270 SessionError::Protocol(
271 "RuntimeEnvironment.plugin_host is required for from_environment".to_string(),
272 )
273 })?;
274 let plugin_session = plugin_host
275 .build_session(state.session_id.as_str(), state.plugin_snapshot.as_ref())
276 .map_err(|err| SessionError::Protocol(err.to_string()))?;
277 let mut embedded = EmbeddedRuntimeHost::new(env.core.clone());
278 if let Some(factory) = env.session_store_factory.as_ref() {
279 embedded = embedded.with_session_store_factory(Arc::clone(factory));
280 }
281 if let Some(store) = env.trigger_store.as_ref() {
282 embedded = embedded.with_trigger_store(Arc::clone(store));
283 }
284 let mut runtime = Self::assemble_runtime(
285 policy,
286 embedded,
287 plugin_session,
288 store,
289 env.process_registry.as_ref().cloned(),
290 state,
291 env.residency,
292 )
293 .await?;
294 runtime.host.process_work_driver = env.process_work_driver.clone();
297 runtime.host.queued_work_driver = env.queued_work_driver.clone();
298 Ok(runtime)
299 }
300
301 pub async fn park(mut self) -> Result<ParkedSession, SessionError> {
307 let store = self.services.store.clone().ok_or_else(|| {
308 SessionError::Protocol(
309 "park() requires a persistent runtime (store is not set)".to_string(),
310 )
311 })?;
312 let session_id = self.state.session_id.clone();
313 let policy = self.policy.clone();
314 let commit = crate::store::RuntimeCommit::persisted_state(&self.state, &[]);
316 let result = commit_runtime_state_with_fresh_session_execution_lease(
317 Arc::clone(&store),
318 commit,
319 &self.runtime_lease_owner,
320 Arc::clone(&self.host.core.clock),
321 )
322 .await
323 .map_err(|err| SessionError::Protocol(format!("failed to persist runtime state: {err}")))?;
324 self.state.apply_persisted_commit_result(result);
325 Ok(ParkedSession {
330 session_id,
331 store,
332 policy,
333 })
334 }
335
336 pub async fn resume(
341 parked: ParkedSession,
342 env: &RuntimeEnvironment,
343 ) -> Result<Self, SessionError> {
344 let loaded = match env.residency {
350 Residency::KeepAll => {
351 crate::store::load_persisted_session_state(parked.store.as_ref()).await
352 }
353 Residency::ActivePathOnly => {
354 crate::store::load_persisted_session_state_active_path(parked.store.as_ref(), None)
355 .await
356 }
357 }
358 .map_err(|err| SessionError::Protocol(format!("failed to load runtime state: {err}")))?;
359 let state = loaded.unwrap_or_else(|| RuntimeSessionState {
360 session_id: parked.session_id.clone(),
361 policy: parked.policy.clone(),
362 ..RuntimeSessionState::default()
363 });
364 Self::from_environment(env, parked.policy, state, Some(parked.store)).await
365 }
366
367 pub async fn get_historic_node(
372 &self,
373 node_id: &str,
374 ) -> Result<Option<crate::SessionNodeRecord>, SessionError> {
375 if let Some(node) = self.state.session_graph.find_node(node_id) {
376 return Ok(Some(node.clone()));
377 }
378 let store = self.services.store.clone().ok_or_else(|| {
379 SessionError::Protocol("get_historic_node() requires a persistent runtime".to_string())
380 })?;
381 store
382 .load_node(node_id)
383 .await
384 .map_err(|err| SessionError::Protocol(format!("failed to load historic node: {err}")))
385 }
386
387 pub async fn orphaned_node_ids(&self) -> Result<Vec<String>, SessionError> {
403 let store = self.services.store.clone().ok_or_else(|| {
404 SessionError::Protocol("orphaned_node_ids() requires a persistent runtime".to_string())
405 })?;
406 let Some(read) = store
407 .load_session(crate::store::SessionReadScope::FullGraph)
408 .await
409 .map_err(|err| SessionError::Protocol(format!("failed to load full graph: {err}")))?
410 else {
411 return Ok(Vec::new());
412 };
413 let active: std::collections::HashSet<&str> = read
414 .graph
415 .active_path_nodes()
416 .iter()
417 .map(|node| node.node_id.as_str())
418 .collect();
419 Ok(read
420 .graph
421 .nodes
422 .iter()
423 .filter(|node| !active.contains(node.node_id.as_str()))
424 .map(|node| node.node_id.clone())
425 .collect())
426 }
427}