1use super::*;
2
3impl LashRuntime {
4 pub fn unregister_plugin_session(&self) -> Result<(), crate::PluginError> {
5 if let Some(session) = self.session.as_ref() {
6 session
7 .plugins()
8 .host()
9 .unregister_session(&self.state.session_id)?;
10 }
11 Ok(())
12 }
13
14 pub(super) async fn from_host_state(
15 policy: SessionPolicy,
16 host: RuntimeHost,
17 services: RuntimeServices,
18 mut state: RuntimeSessionState,
19 ) -> Result<Self, SessionError> {
20 if state.session_id.is_empty() {
21 state.session_id = uuid::Uuid::new_v4().to_string();
22 }
23 let state_policy_was_unconfigured = state.policy.recorded_provider_id().is_empty()
29 && state.policy.model.id.trim().is_empty();
30 if state_policy_was_unconfigured {
31 state.policy = policy.clone();
32 }
33 state.ensure_agent_frame_initialized();
34 let state_policy = state.policy.clone();
35 if let Some(frame) = state.current_agent_frame_mut()
36 && frame.assignment.policy.recorded_provider_id().is_empty()
37 && frame.assignment.policy.model.id.trim().is_empty()
38 {
39 frame.assignment.policy = state_policy;
40 }
41 state.policy = state.effective_policy().clone();
42 state.protocol_turn_options = state.effective_protocol_turn_options().clone();
43 normalize_session_graph(&mut state);
44 let policy = state.effective_policy().clone();
45 if policy.model.id.trim().is_empty() {
46 return Err(SessionError::Protocol(
47 "session policy missing model spec; hosts must supply explicit model metadata"
48 .to_string(),
49 ));
50 }
51 let mut host = host;
52 if let Some(store) = services.store.clone() {
59 let manifest: Arc<dyn crate::AttachmentManifest> =
60 Arc::new(crate::attachments::PersistenceManifestAdapter(store));
61 let scoped: Arc<dyn crate::AttachmentStore> =
62 Arc::new(crate::SessionScopedAttachmentStore::new(
63 Arc::clone(&host.core.durability.attachment_store),
64 manifest,
65 state.session_id.clone(),
66 ));
67 host.core.durability.attachment_store = scoped;
68 }
69 let services = services
70 .with_attachment_store(Arc::clone(&host.core.durability.attachment_store))
71 .with_process_env_store(Arc::clone(&host.core.durability.process_env_store))
72 .with_clock(Arc::clone(&host.core.clock));
73 let mut session = Session::new(services.clone(), &state.session_id).await?;
74 if let Some(tool_state) = state.tool_state_snapshot.clone() {
75 let report = session
83 .plugins()
84 .tool_registry()
85 .restore_state(tool_state)
86 .map_err(|err| SessionError::Protocol(err.to_string()))?;
87 if !report.orphaned.is_empty() {
88 tracing::warn!(
89 session_id = %state.session_id,
90 orphaned = ?report.orphaned,
91 "session restored with orphaned tools: no registered source \
92 resolves them; they are Off until their source returns"
93 );
94 }
95 }
96 session.refresh_tool_catalog().await?;
97 if let Some(snapshot) = state.plugin_snapshot.clone() {
98 session
99 .plugins()
100 .restore(&snapshot)
101 .map_err(|err| SessionError::Protocol(err.to_string()))?;
102 }
103 let protocol_session = Arc::clone(session.plugins().protocol_session());
104 let session_id = state.session_id.clone();
105 protocol_session
106 .restore_session(
107 crate::plugin::ProtocolSessionContext::new(&mut session, &session_id),
108 &state,
109 )
110 .await?;
111 state.discard_runtime_snapshots();
112 session
113 .plugins()
114 .emit_runtime_event(crate::PluginLifecycleEvent::SessionRestored(
115 crate::SessionReadView::from_persisted_state(&state),
116 ))
117 .await;
118 let protocol_turn_options = state.protocol_turn_options.clone();
119 Ok(Self {
120 session: Some(session),
121 policy,
122 host,
123 services,
124 state,
125 runtime_scope_id: Arc::<str>::from(uuid::Uuid::new_v4().to_string()),
126 managed_sessions: Arc::new(Mutex::new(HashMap::new())),
127 managed_turns: Arc::new(Mutex::new(HashMap::new())),
128 protocol_turn_options,
129 shared_token_ledger: Arc::new(std::sync::Mutex::new(Vec::new())),
130 process_sync_needed: Arc::new(AtomicBool::new(false)),
131 turn_phase_probe: None,
132 residency: Residency::default(),
133 })
134 }
135
136 pub async fn from_embedded_state(
138 policy: SessionPolicy,
139 host: EmbeddedRuntimeHost,
140 services: RuntimeServices,
141 state: RuntimeSessionState,
142 ) -> Result<Self, SessionError> {
143 Self::from_host_state(policy, host.into(), services, state).await
144 }
145
146 pub async fn from_background_state(
148 policy: SessionPolicy,
149 host: ProcessRuntimeHost,
150 services: RuntimeServices,
151 state: RuntimeSessionState,
152 ) -> Result<Self, SessionError> {
153 Self::from_host_state(policy, host.into(), services, state).await
154 }
155
156 pub async fn from_persistent_embedded_state(
158 policy: SessionPolicy,
159 host: EmbeddedRuntimeHost,
160 services: PersistentRuntimeServices,
161 state: RuntimeSessionState,
162 ) -> Result<Self, SessionError> {
163 Self::from_host_state(policy, host.into(), services.into_runtime_services(), state).await
164 }
165
166 pub async fn from_persistent_background_state(
168 policy: SessionPolicy,
169 host: ProcessRuntimeHost,
170 services: PersistentRuntimeServices,
171 state: RuntimeSessionState,
172 ) -> Result<Self, SessionError> {
173 Self::from_host_state(policy, host.into(), services.into_runtime_services(), state).await
174 }
175
176 pub(crate) async fn assemble_runtime(
187 policy: SessionPolicy,
188 embedded_host: EmbeddedRuntimeHost,
189 plugin_session: Arc<crate::PluginSession>,
190 store: Option<Arc<dyn crate::store::RuntimePersistence>>,
191 process_registry: Option<Arc<dyn ProcessRegistry>>,
192 mut state: RuntimeSessionState,
193 residency: Residency,
194 ) -> Result<Self, SessionError> {
195 if matches!(residency, Residency::ActivePathOnly) && store.is_none() {
198 return Err(SessionError::Protocol(
199 "Residency::ActivePathOnly requires a persistent store — \
200 without one, trimmed orphans are irrecoverable"
201 .to_string(),
202 ));
203 }
204 normalize_session_graph(&mut state);
207 apply_residency_on_load(&mut state, residency);
208 let mut runtime = match (store, process_registry) {
209 (Some(store), Some(registry)) => {
210 let host = ProcessRuntimeHost::new(embedded_host, registry);
211 let services = PersistentRuntimeServices::new(plugin_session, store);
212 Self::from_persistent_background_state(policy, host, services, state).await?
213 }
214 (Some(store), None) => {
215 let services = PersistentRuntimeServices::new(plugin_session, store);
216 Self::from_persistent_embedded_state(policy, embedded_host, services, state).await?
217 }
218 (None, Some(registry)) => {
219 let host = ProcessRuntimeHost::new(embedded_host, registry);
220 let services = RuntimeServices::new(plugin_session);
221 Self::from_background_state(policy, host, services, state).await?
222 }
223 (None, None) => {
224 let services = RuntimeServices::new(plugin_session);
225 Self::from_embedded_state(policy, embedded_host, services, state).await?
226 }
227 };
228 runtime.residency = residency;
229 Ok(runtime)
230 }
231
232 pub async fn from_environment(
248 env: &RuntimeEnvironment,
249 policy: SessionPolicy,
250 state: RuntimeSessionState,
251 store: Option<Arc<dyn crate::store::RuntimePersistence>>,
252 ) -> Result<Self, SessionError> {
253 let plugin_host = env.plugin_host.as_ref().ok_or_else(|| {
254 SessionError::Protocol(
255 "RuntimeEnvironment.plugin_host is required for from_environment".to_string(),
256 )
257 })?;
258 let plugin_session = plugin_host
259 .build_session(state.session_id.as_str(), state.plugin_snapshot.as_ref())
260 .map_err(|err| SessionError::Protocol(err.to_string()))?;
261 let mut embedded = EmbeddedRuntimeHost::new(env.core.clone());
262 if let Some(factory) = env.session_store_factory.as_ref() {
263 embedded = embedded.with_session_store_factory(Arc::clone(factory));
264 }
265 if let Some(store) = env.trigger_store.as_ref() {
266 embedded = embedded.with_trigger_store(Arc::clone(store));
267 }
268 let mut runtime = Self::assemble_runtime(
269 policy,
270 embedded,
271 plugin_session,
272 store,
273 env.process_registry.as_ref().cloned(),
274 state,
275 env.residency,
276 )
277 .await?;
278 runtime.host.process_work_driver = env.process_work_driver.clone();
281 runtime.host.queued_work_driver = env.queued_work_driver.clone();
282 Ok(runtime)
283 }
284
285 pub async fn park(mut self) -> Result<ParkedSession, SessionError> {
291 let store = self.services.store.clone().ok_or_else(|| {
292 SessionError::Protocol(
293 "park() requires a persistent runtime (store is not set)".to_string(),
294 )
295 })?;
296 let session_id = self.state.session_id.clone();
297 let policy = self.policy.clone();
298 let commit = crate::store::RuntimeCommit::persisted_state(&self.state, &[]);
300 let result = store.commit_runtime_state(commit).await.map_err(|err| {
301 SessionError::Protocol(format!("failed to persist runtime state: {err}"))
302 })?;
303 self.state.apply_persisted_commit_result(result);
304 Ok(ParkedSession {
309 session_id,
310 store,
311 policy,
312 })
313 }
314
315 pub async fn resume(
320 parked: ParkedSession,
321 env: &RuntimeEnvironment,
322 ) -> Result<Self, SessionError> {
323 let loaded = match env.residency {
329 Residency::KeepAll => {
330 crate::store::load_persisted_session_state(parked.store.as_ref()).await
331 }
332 Residency::ActivePathOnly => {
333 crate::store::load_persisted_session_state_active_path(parked.store.as_ref(), None)
334 .await
335 }
336 }
337 .map_err(|err| SessionError::Protocol(format!("failed to load runtime state: {err}")))?;
338 let state = loaded.unwrap_or_else(|| RuntimeSessionState {
339 session_id: parked.session_id.clone(),
340 policy: parked.policy.clone(),
341 ..RuntimeSessionState::default()
342 });
343 Self::from_environment(env, parked.policy, state, Some(parked.store)).await
344 }
345
346 pub async fn get_historic_node(
351 &self,
352 node_id: &str,
353 ) -> Result<Option<crate::SessionNodeRecord>, SessionError> {
354 if let Some(node) = self.state.session_graph.find_node(node_id) {
355 return Ok(Some(node.clone()));
356 }
357 let store = self.services.store.clone().ok_or_else(|| {
358 SessionError::Protocol("get_historic_node() requires a persistent runtime".to_string())
359 })?;
360 store
361 .load_node(node_id)
362 .await
363 .map_err(|err| SessionError::Protocol(format!("failed to load historic node: {err}")))
364 }
365
366 pub async fn orphaned_node_ids(&self) -> Result<Vec<String>, SessionError> {
382 let store = self.services.store.clone().ok_or_else(|| {
383 SessionError::Protocol("orphaned_node_ids() requires a persistent runtime".to_string())
384 })?;
385 let Some(read) = store
386 .load_session(crate::store::SessionReadScope::FullGraph)
387 .await
388 .map_err(|err| SessionError::Protocol(format!("failed to load full graph: {err}")))?
389 else {
390 return Ok(Vec::new());
391 };
392 let active: std::collections::HashSet<&str> = read
393 .graph
394 .active_path_nodes()
395 .iter()
396 .map(|node| node.node_id.as_str())
397 .collect();
398 Ok(read
399 .graph
400 .nodes
401 .iter()
402 .filter(|node| !active.contains(node.node_id.as_str()))
403 .map(|node| node.node_id.clone())
404 .collect())
405 }
406}