use super::*;
impl LashRuntime {
pub(super) async fn from_host_state(
policy: SessionPolicy,
host: RuntimeHost,
services: RuntimeServices,
mut state: PersistedSessionState,
) -> Result<Self, SessionError> {
if state.session_id.is_empty() {
state.session_id = uuid::Uuid::new_v4().to_string();
}
if state.policy.provider.kind() == "unconfigured" {
state.policy = policy.clone();
}
normalize_session_graph(&mut state);
if policy.max_context_tokens.is_none() {
return Err(SessionError::Protocol(
"session policy missing max_context_tokens; hosts must supply explicit model metadata"
.to_string(),
));
}
let services = services.with_attachment_store(Arc::clone(&host.core.attachment_store));
let mut session = Session::new(
services.clone(),
&state.session_id,
state.policy.execution_mode.clone(),
)
.await?;
if let Some(tool_state) = state.tool_state_snapshot.clone()
&& let Err(err) = session.plugins().tool_registry().apply_state(tool_state)
{
tracing::warn!("failed to restore tool state from checkpoint: {err}");
}
if let Some(snapshot) = state.plugin_snapshot.clone() {
session
.plugins()
.restore(&snapshot)
.map_err(|err| SessionError::Protocol(err.to_string()))?;
}
let mode_session = Arc::clone(session.plugins().mode_session());
let session_id = state.session_id.clone();
mode_session
.restore_session(
crate::plugin::ModeSessionContext::new(&mut session, &session_id),
&state,
)
.await?;
state.discard_runtime_snapshots();
session
.plugins()
.emit_runtime_event(crate::PluginRuntimeEvent::SessionRestored(
crate::SessionReadView::from_persisted_state(&state),
))
.await;
let mode_turn_options = state.mode_turn_options.clone();
Ok(Self {
session: Some(session),
policy,
host,
services,
state,
runtime_scope_id: Arc::<str>::from(uuid::Uuid::new_v4().to_string()),
managed_sessions: Arc::new(Mutex::new(HashMap::new())),
active_handoff_continuations: Arc::new(Mutex::new(HashMap::new())),
managed_turns: Arc::new(Mutex::new(HashMap::new())),
mode_turn_options,
shared_token_ledger: Arc::new(std::sync::Mutex::new(Vec::new())),
background_sync_needed: Arc::new(AtomicBool::new(false)),
pending_first_turn_inputs: Arc::new(std::sync::Mutex::new(HashMap::new())),
turn_phase_probe: None,
})
}
pub async fn from_embedded_state(
policy: SessionPolicy,
host: EmbeddedRuntimeHost,
services: RuntimeServices,
state: PersistedSessionState,
) -> Result<Self, SessionError> {
Self::from_host_state(policy, host.into(), services, state).await
}
pub async fn from_background_state(
policy: SessionPolicy,
host: BackgroundRuntimeHost,
services: RuntimeServices,
state: PersistedSessionState,
) -> Result<Self, SessionError> {
Self::from_host_state(policy, host.into(), services, state).await
}
pub async fn from_persistent_embedded_state(
policy: SessionPolicy,
host: EmbeddedRuntimeHost,
services: PersistentRuntimeServices,
state: PersistedSessionState,
) -> Result<Self, SessionError> {
Self::from_host_state(policy, host.into(), services.into_runtime_services(), state).await
}
pub async fn from_persistent_background_state(
policy: SessionPolicy,
host: BackgroundRuntimeHost,
services: PersistentRuntimeServices,
state: PersistedSessionState,
) -> Result<Self, SessionError> {
Self::from_host_state(policy, host.into(), services.into_runtime_services(), state).await
}
pub async fn from_environment(
env: &RuntimeEnvironment,
policy: SessionPolicy,
mut state: PersistedSessionState,
store: Option<Arc<dyn crate::store::RuntimePersistence>>,
) -> Result<Self, SessionError> {
if matches!(env.residency, Residency::ActivePathOnly) && store.is_none() {
return Err(SessionError::Protocol(
"Residency::ActivePathOnly requires a persistent store — \
without one, trimmed orphans are irrecoverable"
.to_string(),
));
}
normalize_session_graph(&mut state);
apply_residency_on_load(&mut state, env.residency);
let plugin_host = env.plugin_host.as_ref().ok_or_else(|| {
SessionError::Protocol(
"RuntimeEnvironment.plugin_host is required for from_environment".to_string(),
)
})?;
let plugin_session = plugin_host
.build_session(
state.session_id.as_str(),
policy.execution_mode.clone(),
policy.standard_context_approach.clone(),
state.plugin_snapshot.as_ref(),
)
.map_err(|err| SessionError::Protocol(err.to_string()))?;
let core = RuntimeCoreConfig {
attachment_store: Arc::clone(&env.attachment_store),
prompt: env.prompt.clone(),
trace_sink: env.trace_sink.clone(),
trace_level: env.trace_level,
trace_context: env.trace_context.clone(),
termination: env.termination.clone(),
};
let mut embedded = EmbeddedRuntimeHost::new(core);
if let Some(factory) = env.session_store_factory.as_ref() {
embedded = embedded.with_session_store_factory(Arc::clone(factory));
}
let runtime = if let Some(store) = store {
let services = PersistentRuntimeServices::new_with_bridges(
plugin_session,
crate::session::TurnInjectionBridge::new(),
crate::session::TurnInputInjectionBridge::new(),
store,
);
match env.background_task_host.as_ref() {
Some(executor) => {
let host = BackgroundRuntimeHost::new(embedded, Arc::clone(executor));
Self::from_persistent_background_state(policy, host, services, state).await?
}
None => {
Self::from_persistent_embedded_state(policy, embedded, services, state).await?
}
}
} else {
let services = RuntimeServices::new(plugin_session);
match env.background_task_host.as_ref() {
Some(executor) => {
let host = BackgroundRuntimeHost::new(embedded, Arc::clone(executor));
Self::from_background_state(policy, host, services, state).await?
}
None => Self::from_embedded_state(policy, embedded, services, state).await?,
}
};
Ok(runtime)
}
pub async fn park(mut self) -> Result<ParkedSession, SessionError> {
let store = self.services.store.clone().ok_or_else(|| {
SessionError::Protocol(
"park() requires a persistent runtime (store is not set)".to_string(),
)
})?;
let session_id = self.state.session_id.clone();
let policy = self.policy.clone();
let commit = crate::store::RuntimeCommit::persisted_state(&self.state, &[]);
let result = store.commit_runtime_state(commit).await.map_err(|err| {
SessionError::Protocol(format!("failed to persist runtime state: {err}"))
})?;
self.state.apply_persisted_commit_result(result);
Ok(ParkedSession {
session_id,
store,
policy,
})
}
pub async fn resume(
parked: ParkedSession,
env: &RuntimeEnvironment,
) -> Result<Self, SessionError> {
let loaded = match env.residency {
Residency::KeepAll => {
crate::store::load_persisted_session_state(parked.store.as_ref()).await
}
Residency::ActivePathOnly => {
crate::store::load_persisted_session_state_active_path(parked.store.as_ref(), None)
.await
}
}
.map_err(|err| SessionError::Protocol(format!("failed to load runtime state: {err}")))?;
let state = loaded.unwrap_or_else(|| PersistedSessionState {
session_id: parked.session_id.clone(),
policy: parked.policy.clone(),
..PersistedSessionState::default()
});
Self::from_environment(env, parked.policy, state, Some(parked.store)).await
}
pub async fn get_historic_node(
&self,
node_id: &str,
) -> Result<Option<crate::SessionNodeRecord>, SessionError> {
if let Some(node) = self.state.session_graph.find_node(node_id) {
return Ok(Some(node.clone()));
}
let store = self.services.store.clone().ok_or_else(|| {
SessionError::Protocol("get_historic_node() requires a persistent runtime".to_string())
})?;
store
.load_node(node_id)
.await
.map_err(|err| SessionError::Protocol(format!("failed to load historic node: {err}")))
}
pub async fn orphaned_node_ids(&self) -> Result<Vec<String>, SessionError> {
let store = self.services.store.clone().ok_or_else(|| {
SessionError::Protocol("orphaned_node_ids() requires a persistent runtime".to_string())
})?;
let Some(read) = store
.load_session(crate::store::SessionReadScope::FullGraph)
.await
.map_err(|err| SessionError::Protocol(format!("failed to load full graph: {err}")))?
else {
return Ok(Vec::new());
};
let active: std::collections::HashSet<&str> = read
.graph
.active_path_nodes()
.iter()
.map(|node| node.node_id.as_str())
.collect();
Ok(read
.graph
.nodes
.iter()
.filter(|node| !active.contains(node.node_id.as_str()))
.map(|node| node.node_id.clone())
.collect())
}
}