1use super::*;
2
3impl LashRuntime {
4 pub fn unregister_plugin_session(&self) -> Result<(), crate::PluginError> {
5 if let Some(session) = self.session.as_ref() {
6 session
7 .plugins()
8 .host()
9 .unregister_session(&self.state.session_id)?;
10 }
11 Ok(())
12 }
13
14 pub(super) async fn from_host_state(
15 policy: SessionPolicy,
16 host: RuntimeHost,
17 services: RuntimeServices,
18 mut state: RuntimeSessionState,
19 ) -> Result<Self, SessionError> {
20 if state.session_id.is_empty() {
21 state.session_id = uuid::Uuid::new_v4().to_string();
22 }
23 let state_policy_was_unconfigured = state.policy.recorded_provider_id().is_empty()
29 && state.policy.model.id.trim().is_empty();
30 if state_policy_was_unconfigured {
31 state.policy = policy.clone();
32 }
33 state.ensure_agent_frame_initialized();
34 let state_policy = state.policy.clone();
35 if let Some(frame) = state.current_agent_frame_mut()
36 && frame.assignment.policy.recorded_provider_id().is_empty()
37 && frame.assignment.policy.model.id.trim().is_empty()
38 {
39 frame.assignment.policy = state_policy;
40 }
41 state.policy = state.effective_policy().clone();
42 state.protocol_turn_options = state.effective_protocol_turn_options().clone();
43 normalize_session_graph(&mut state);
44 let policy = state.effective_policy().clone();
45 if policy.model.id.trim().is_empty() {
46 return Err(SessionError::Protocol(
47 "session policy missing model spec; hosts must supply explicit model metadata"
48 .to_string(),
49 ));
50 }
51 let mut host = host;
52 if let Some(store) = services.store.clone() {
59 let manifest: Arc<dyn crate::AttachmentManifest> =
60 Arc::new(crate::attachments::PersistenceManifestAdapter(store));
61 let scoped: Arc<dyn crate::AttachmentStore> =
62 Arc::new(crate::SessionScopedAttachmentStore::new(
63 Arc::clone(&host.core.durability.attachment_store),
64 manifest,
65 state.session_id.clone(),
66 ));
67 host.core.durability.attachment_store = scoped;
68 }
69 let services =
70 services.with_attachment_store(Arc::clone(&host.core.durability.attachment_store));
71 let services = services.with_lashlang_artifact_store(Arc::clone(
72 &host.core.durability.lashlang_artifact_store,
73 ));
74 let mut session = Session::new(services.clone(), &state.session_id).await?;
75 if let Some(tool_state) = state.tool_state_snapshot.clone() {
76 let report = session
84 .plugins()
85 .tool_registry()
86 .restore_state(tool_state)
87 .map_err(|err| SessionError::Protocol(err.to_string()))?;
88 if !report.orphaned.is_empty() {
89 tracing::warn!(
90 session_id = %state.session_id,
91 orphaned = ?report.orphaned,
92 "session restored with orphaned tools: no registered source \
93 resolves them; they are Off until their source returns"
94 );
95 }
96 }
97 session.refresh_tool_surface().await?;
98 if let Some(snapshot) = state.plugin_snapshot.clone() {
99 session
100 .plugins()
101 .restore(&snapshot)
102 .map_err(|err| SessionError::Protocol(err.to_string()))?;
103 }
104 let protocol_session = Arc::clone(session.plugins().protocol_session());
105 let session_id = state.session_id.clone();
106 protocol_session
107 .restore_session(
108 crate::plugin::ProtocolSessionContext::new(&mut session, &session_id),
109 &state,
110 )
111 .await?;
112 state.discard_runtime_snapshots();
113 session
114 .plugins()
115 .emit_runtime_event(crate::PluginLifecycleEvent::SessionRestored(
116 crate::SessionReadView::from_persisted_state(&state),
117 ))
118 .await;
119 let protocol_turn_options = state.protocol_turn_options.clone();
120 Ok(Self {
121 session: Some(session),
122 policy,
123 host,
124 services,
125 state,
126 runtime_scope_id: Arc::<str>::from(uuid::Uuid::new_v4().to_string()),
127 managed_sessions: Arc::new(Mutex::new(HashMap::new())),
128 managed_turns: Arc::new(Mutex::new(HashMap::new())),
129 protocol_turn_options,
130 shared_token_ledger: Arc::new(std::sync::Mutex::new(Vec::new())),
131 process_sync_needed: Arc::new(AtomicBool::new(false)),
132 turn_phase_probe: None,
133 residency: Residency::default(),
134 })
135 }
136
137 pub async fn from_embedded_state(
139 policy: SessionPolicy,
140 host: EmbeddedRuntimeHost,
141 services: RuntimeServices,
142 state: RuntimeSessionState,
143 ) -> Result<Self, SessionError> {
144 Self::from_host_state(policy, host.into(), services, state).await
145 }
146
147 pub async fn from_background_state(
149 policy: SessionPolicy,
150 host: ProcessRuntimeHost,
151 services: RuntimeServices,
152 state: RuntimeSessionState,
153 ) -> Result<Self, SessionError> {
154 Self::from_host_state(policy, host.into(), services, state).await
155 }
156
157 pub async fn from_persistent_embedded_state(
159 policy: SessionPolicy,
160 host: EmbeddedRuntimeHost,
161 services: PersistentRuntimeServices,
162 state: RuntimeSessionState,
163 ) -> Result<Self, SessionError> {
164 Self::from_host_state(policy, host.into(), services.into_runtime_services(), state).await
165 }
166
167 pub async fn from_persistent_background_state(
169 policy: SessionPolicy,
170 host: ProcessRuntimeHost,
171 services: PersistentRuntimeServices,
172 state: RuntimeSessionState,
173 ) -> Result<Self, SessionError> {
174 Self::from_host_state(policy, host.into(), services.into_runtime_services(), state).await
175 }
176
177 pub(crate) async fn assemble_runtime(
188 policy: SessionPolicy,
189 embedded_host: EmbeddedRuntimeHost,
190 plugin_session: Arc<crate::PluginSession>,
191 store: Option<Arc<dyn crate::store::RuntimePersistence>>,
192 process_registry: Option<Arc<dyn ProcessRegistry>>,
193 mut state: RuntimeSessionState,
194 residency: Residency,
195 ) -> Result<Self, SessionError> {
196 if matches!(residency, Residency::ActivePathOnly) && store.is_none() {
199 return Err(SessionError::Protocol(
200 "Residency::ActivePathOnly requires a persistent store — \
201 without one, trimmed orphans are irrecoverable"
202 .to_string(),
203 ));
204 }
205 normalize_session_graph(&mut state);
208 apply_residency_on_load(&mut state, residency);
209 let mut runtime = match (store, process_registry) {
210 (Some(store), Some(registry)) => {
211 let host = ProcessRuntimeHost::new(embedded_host, registry);
212 let services = PersistentRuntimeServices::new(plugin_session, store);
213 Self::from_persistent_background_state(policy, host, services, state).await?
214 }
215 (Some(store), None) => {
216 let services = PersistentRuntimeServices::new(plugin_session, store);
217 Self::from_persistent_embedded_state(policy, embedded_host, services, state).await?
218 }
219 (None, Some(registry)) => {
220 let host = ProcessRuntimeHost::new(embedded_host, registry);
221 let services = RuntimeServices::new(plugin_session);
222 Self::from_background_state(policy, host, services, state).await?
223 }
224 (None, None) => {
225 let services = RuntimeServices::new(plugin_session);
226 Self::from_embedded_state(policy, embedded_host, services, state).await?
227 }
228 };
229 runtime.residency = residency;
230 Ok(runtime)
231 }
232
233 pub async fn from_environment(
249 env: &RuntimeEnvironment,
250 policy: SessionPolicy,
251 state: RuntimeSessionState,
252 store: Option<Arc<dyn crate::store::RuntimePersistence>>,
253 ) -> Result<Self, SessionError> {
254 let plugin_host = env.plugin_host.as_ref().ok_or_else(|| {
255 SessionError::Protocol(
256 "RuntimeEnvironment.plugin_host is required for from_environment".to_string(),
257 )
258 })?;
259 let plugin_host = plugin_host.as_ref().clone().with_lashlang_abilities(
260 super::builder::lashlang_abilities_for_process_registry(
261 plugin_host.lashlang_abilities(),
262 env.process_registry.is_some(),
263 ),
264 );
265 let plugin_session = plugin_host
266 .build_session(state.session_id.as_str(), state.plugin_snapshot.as_ref())
267 .map_err(|err| SessionError::Protocol(err.to_string()))?;
268 let mut embedded = EmbeddedRuntimeHost::new(env.core.clone());
269 if let Some(factory) = env.session_store_factory.as_ref() {
270 embedded = embedded.with_session_store_factory(Arc::clone(factory));
271 }
272 if let Some(store) = env.host_event_store.as_ref() {
273 embedded = embedded.with_host_event_store(Arc::clone(store));
274 }
275 let mut runtime = Self::assemble_runtime(
276 policy,
277 embedded,
278 plugin_session,
279 store,
280 env.process_registry.as_ref().cloned(),
281 state,
282 env.residency,
283 )
284 .await?;
285 runtime.host.process_work_poke = env.process_work_poke.clone();
288 runtime.host.queued_work_poke = env.queued_work_poke.clone();
289 Ok(runtime)
290 }
291
292 pub async fn park(mut self) -> Result<ParkedSession, SessionError> {
298 let store = self.services.store.clone().ok_or_else(|| {
299 SessionError::Protocol(
300 "park() requires a persistent runtime (store is not set)".to_string(),
301 )
302 })?;
303 let session_id = self.state.session_id.clone();
304 let policy = self.policy.clone();
305 let commit = crate::store::RuntimeCommit::persisted_state(&self.state, &[]);
307 let result = store.commit_runtime_state(commit).await.map_err(|err| {
308 SessionError::Protocol(format!("failed to persist runtime state: {err}"))
309 })?;
310 self.state.apply_persisted_commit_result(result);
311 Ok(ParkedSession {
316 session_id,
317 store,
318 policy,
319 })
320 }
321
322 pub async fn resume(
327 parked: ParkedSession,
328 env: &RuntimeEnvironment,
329 ) -> Result<Self, SessionError> {
330 let loaded = match env.residency {
336 Residency::KeepAll => {
337 crate::store::load_persisted_session_state(parked.store.as_ref()).await
338 }
339 Residency::ActivePathOnly => {
340 crate::store::load_persisted_session_state_active_path(parked.store.as_ref(), None)
341 .await
342 }
343 }
344 .map_err(|err| SessionError::Protocol(format!("failed to load runtime state: {err}")))?;
345 let state = loaded.unwrap_or_else(|| RuntimeSessionState {
346 session_id: parked.session_id.clone(),
347 policy: parked.policy.clone(),
348 ..RuntimeSessionState::default()
349 });
350 Self::from_environment(env, parked.policy, state, Some(parked.store)).await
351 }
352
353 pub async fn get_historic_node(
358 &self,
359 node_id: &str,
360 ) -> Result<Option<crate::SessionNodeRecord>, SessionError> {
361 if let Some(node) = self.state.session_graph.find_node(node_id) {
362 return Ok(Some(node.clone()));
363 }
364 let store = self.services.store.clone().ok_or_else(|| {
365 SessionError::Protocol("get_historic_node() requires a persistent runtime".to_string())
366 })?;
367 store
368 .load_node(node_id)
369 .await
370 .map_err(|err| SessionError::Protocol(format!("failed to load historic node: {err}")))
371 }
372
373 pub async fn orphaned_node_ids(&self) -> Result<Vec<String>, SessionError> {
389 let store = self.services.store.clone().ok_or_else(|| {
390 SessionError::Protocol("orphaned_node_ids() requires a persistent runtime".to_string())
391 })?;
392 let Some(read) = store
393 .load_session(crate::store::SessionReadScope::FullGraph)
394 .await
395 .map_err(|err| SessionError::Protocol(format!("failed to load full graph: {err}")))?
396 else {
397 return Ok(Vec::new());
398 };
399 let active: std::collections::HashSet<&str> = read
400 .graph
401 .active_path_nodes()
402 .iter()
403 .map(|node| node.node_id.as_str())
404 .collect();
405 Ok(read
406 .graph
407 .nodes
408 .iter()
409 .filter(|node| !active.contains(node.node_id.as_str()))
410 .map(|node| node.node_id.clone())
411 .collect())
412 }
413}