1use super::*;
2
3impl LashRuntime {
4 pub(super) async fn from_host_state(
5 policy: SessionPolicy,
6 host: RuntimeHost,
7 services: RuntimeServices,
8 mut state: PersistedSessionState,
9 ) -> Result<Self, SessionError> {
10 if state.session_id.is_empty() {
11 state.session_id = uuid::Uuid::new_v4().to_string();
12 }
13 if state.policy.provider.kind() == "unconfigured" {
19 state.policy = policy.clone();
20 }
21 normalize_session_graph(&mut state);
22 if policy.max_context_tokens.is_none() {
23 return Err(SessionError::Protocol(
24 "session policy missing max_context_tokens; hosts must supply explicit model metadata"
25 .to_string(),
26 ));
27 }
28 let services = services.with_attachment_store(Arc::clone(&host.core.attachment_store));
29 let mut session = Session::new(
30 services.clone(),
31 &state.session_id,
32 state.policy.execution_mode.clone(),
33 )
34 .await?;
35 if let Some(tool_state) = state.tool_state_snapshot.clone()
36 && let Err(err) = session.plugins().tool_registry().apply_state(tool_state)
37 {
38 tracing::warn!("failed to restore tool state from checkpoint: {err}");
39 }
40 if let Some(snapshot) = state.plugin_snapshot.clone() {
41 session
42 .plugins()
43 .restore(&snapshot)
44 .map_err(|err| SessionError::Protocol(err.to_string()))?;
45 }
46 let mode_session = Arc::clone(session.plugins().mode_session());
47 let session_id = state.session_id.clone();
48 mode_session
49 .restore_session(
50 crate::plugin::ModeSessionContext::new(&mut session, &session_id),
51 &state,
52 )
53 .await?;
54 state.discard_runtime_snapshots();
55 session
56 .plugins()
57 .emit_runtime_event(crate::PluginRuntimeEvent::SessionRestored(
58 crate::SessionReadView::from_persisted_state(&state),
59 ))
60 .await;
61 let mode_turn_options = state.mode_turn_options.clone();
62 Ok(Self {
63 session: Some(session),
64 policy,
65 host,
66 services,
67 state,
68 runtime_scope_id: Arc::<str>::from(uuid::Uuid::new_v4().to_string()),
69 managed_sessions: Arc::new(Mutex::new(HashMap::new())),
70 active_handoff_continuations: Arc::new(Mutex::new(HashMap::new())),
71 managed_turns: Arc::new(Mutex::new(HashMap::new())),
72 mode_turn_options,
73 shared_token_ledger: Arc::new(std::sync::Mutex::new(Vec::new())),
74 background_sync_needed: Arc::new(AtomicBool::new(false)),
75 pending_first_turn_inputs: Arc::new(std::sync::Mutex::new(HashMap::new())),
76 turn_phase_probe: None,
77 })
78 }
79
80 pub async fn from_embedded_state(
82 policy: SessionPolicy,
83 host: EmbeddedRuntimeHost,
84 services: RuntimeServices,
85 state: PersistedSessionState,
86 ) -> Result<Self, SessionError> {
87 Self::from_host_state(policy, host.into(), services, state).await
88 }
89
90 pub async fn from_background_state(
92 policy: SessionPolicy,
93 host: BackgroundRuntimeHost,
94 services: RuntimeServices,
95 state: PersistedSessionState,
96 ) -> Result<Self, SessionError> {
97 Self::from_host_state(policy, host.into(), services, state).await
98 }
99
100 pub async fn from_persistent_embedded_state(
102 policy: SessionPolicy,
103 host: EmbeddedRuntimeHost,
104 services: PersistentRuntimeServices,
105 state: PersistedSessionState,
106 ) -> Result<Self, SessionError> {
107 Self::from_host_state(policy, host.into(), services.into_runtime_services(), state).await
108 }
109
110 pub async fn from_persistent_background_state(
112 policy: SessionPolicy,
113 host: BackgroundRuntimeHost,
114 services: PersistentRuntimeServices,
115 state: PersistedSessionState,
116 ) -> Result<Self, SessionError> {
117 Self::from_host_state(policy, host.into(), services.into_runtime_services(), state).await
118 }
119
120 pub async fn from_environment(
136 env: &RuntimeEnvironment,
137 policy: SessionPolicy,
138 mut state: PersistedSessionState,
139 store: Option<Arc<dyn crate::store::RuntimePersistence>>,
140 ) -> Result<Self, SessionError> {
141 if matches!(env.residency, Residency::ActivePathOnly) && store.is_none() {
144 return Err(SessionError::Protocol(
145 "Residency::ActivePathOnly requires a persistent store — \
146 without one, trimmed orphans are irrecoverable"
147 .to_string(),
148 ));
149 }
150 normalize_session_graph(&mut state);
155 apply_residency_on_load(&mut state, env.residency);
156 let plugin_host = env.plugin_host.as_ref().ok_or_else(|| {
157 SessionError::Protocol(
158 "RuntimeEnvironment.plugin_host is required for from_environment".to_string(),
159 )
160 })?;
161 let plugin_session = plugin_host
162 .build_session(
163 state.session_id.as_str(),
164 policy.execution_mode.clone(),
165 policy.standard_context_approach.clone(),
166 state.plugin_snapshot.as_ref(),
167 )
168 .map_err(|err| SessionError::Protocol(err.to_string()))?;
169 let core = RuntimeCoreConfig {
170 attachment_store: Arc::clone(&env.attachment_store),
171 prompt: env.prompt.clone(),
172 trace_sink: env.trace_sink.clone(),
173 trace_level: env.trace_level,
174 trace_context: env.trace_context.clone(),
175 termination: env.termination.clone(),
176 };
177 let mut embedded = EmbeddedRuntimeHost::new(core);
178 if let Some(factory) = env.session_store_factory.as_ref() {
179 embedded = embedded.with_session_store_factory(Arc::clone(factory));
180 }
181 let runtime = if let Some(store) = store {
182 let services = PersistentRuntimeServices::new_with_bridges(
183 plugin_session,
184 crate::session::TurnInjectionBridge::new(),
185 crate::session::TurnInputInjectionBridge::new(),
186 store,
187 );
188 match env.background_task_host.as_ref() {
189 Some(executor) => {
190 let host = BackgroundRuntimeHost::new(embedded, Arc::clone(executor));
191 Self::from_persistent_background_state(policy, host, services, state).await?
192 }
193 None => {
194 Self::from_persistent_embedded_state(policy, embedded, services, state).await?
195 }
196 }
197 } else {
198 let services = RuntimeServices::new(plugin_session);
199 match env.background_task_host.as_ref() {
200 Some(executor) => {
201 let host = BackgroundRuntimeHost::new(embedded, Arc::clone(executor));
202 Self::from_background_state(policy, host, services, state).await?
203 }
204 None => Self::from_embedded_state(policy, embedded, services, state).await?,
205 }
206 };
207 Ok(runtime)
208 }
209
210 pub async fn park(mut self) -> Result<ParkedSession, SessionError> {
216 let store = self.services.store.clone().ok_or_else(|| {
217 SessionError::Protocol(
218 "park() requires a persistent runtime (store is not set)".to_string(),
219 )
220 })?;
221 let session_id = self.state.session_id.clone();
222 let policy = self.policy.clone();
223 let commit = crate::store::RuntimeCommit::persisted_state(&self.state, &[]);
225 let result = store.commit_runtime_state(commit).await.map_err(|err| {
226 SessionError::Protocol(format!("failed to persist runtime state: {err}"))
227 })?;
228 self.state.apply_persisted_commit_result(result);
229 Ok(ParkedSession {
234 session_id,
235 store,
236 policy,
237 })
238 }
239
240 pub async fn resume(
245 parked: ParkedSession,
246 env: &RuntimeEnvironment,
247 ) -> Result<Self, SessionError> {
248 let loaded = match env.residency {
254 Residency::KeepAll => {
255 crate::store::load_persisted_session_state(parked.store.as_ref()).await
256 }
257 Residency::ActivePathOnly => {
258 crate::store::load_persisted_session_state_active_path(parked.store.as_ref(), None)
259 .await
260 }
261 }
262 .map_err(|err| SessionError::Protocol(format!("failed to load runtime state: {err}")))?;
263 let state = loaded.unwrap_or_else(|| PersistedSessionState {
264 session_id: parked.session_id.clone(),
265 policy: parked.policy.clone(),
266 ..PersistedSessionState::default()
267 });
268 Self::from_environment(env, parked.policy, state, Some(parked.store)).await
269 }
270
271 pub async fn get_historic_node(
276 &self,
277 node_id: &str,
278 ) -> Result<Option<crate::SessionNodeRecord>, SessionError> {
279 if let Some(node) = self.state.session_graph.find_node(node_id) {
280 return Ok(Some(node.clone()));
281 }
282 let store = self.services.store.clone().ok_or_else(|| {
283 SessionError::Protocol("get_historic_node() requires a persistent runtime".to_string())
284 })?;
285 store
286 .load_node(node_id)
287 .await
288 .map_err(|err| SessionError::Protocol(format!("failed to load historic node: {err}")))
289 }
290
291 pub async fn orphaned_node_ids(&self) -> Result<Vec<String>, SessionError> {
307 let store = self.services.store.clone().ok_or_else(|| {
308 SessionError::Protocol("orphaned_node_ids() requires a persistent runtime".to_string())
309 })?;
310 let Some(read) = store
311 .load_session(crate::store::SessionReadScope::FullGraph)
312 .await
313 .map_err(|err| SessionError::Protocol(format!("failed to load full graph: {err}")))?
314 else {
315 return Ok(Vec::new());
316 };
317 let active: std::collections::HashSet<&str> = read
318 .graph
319 .active_path_nodes()
320 .iter()
321 .map(|node| node.node_id.as_str())
322 .collect();
323 Ok(read
324 .graph
325 .nodes
326 .iter()
327 .filter(|node| !active.contains(node.node_id.as_str()))
328 .map(|node| node.node_id.clone())
329 .collect())
330 }
331}