use std::collections::BTreeSet;
use std::fs;
use std::path::{Path, PathBuf};
use anyhow::{bail, Context, Result};
use serde::{Deserialize, Serialize};
use crate::paths::{state::StateLayout, write};
use crate::state::{pod_identity, session as session_state};
const MACHINE_PRESENCE_SCHEMA_VERSION: u32 = 1;
const FILESYSTEM_RELAY_TRANSPORT: &str = "filesystem_relay";
pub(crate) const DEFAULT_READY_LEASE_SECS: u64 = 8 * 60 * 60;
pub(crate) const DEFAULT_IDLE_LEASE_SECS: u64 = 15 * 60;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum MachineRuntimeHealth {
Ready,
Idle,
Degraded,
}
#[derive(Debug, Clone, Serialize)]
pub(crate) struct MachinePresenceView {
pub status: &'static str,
#[serde(skip_serializing_if = "Option::is_none")]
pub transport: Option<&'static str>,
#[serde(skip_serializing_if = "Option::is_none")]
pub relay_root: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub pod_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub machine_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub display_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub trust_class: Option<pod_identity::MachineTrustClass>,
#[serde(skip_serializing_if = "Option::is_none")]
pub presence_path: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub first_seen_epoch_s: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_heartbeat_at_epoch_s: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub lease_ttl_secs: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub lease_expires_at_epoch_s: Option<u64>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub available_profiles: Vec<String>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub capabilities: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub runtime_health: Option<MachineRuntimeHealth>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub reachable_projects: Vec<String>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub observed_machines: Vec<MachinePresencePeerView>,
#[serde(skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
pub(crate) struct MachinePresencePeerView {
pub status: &'static str,
pub machine_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub display_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub trust_class: Option<pod_identity::MachineTrustClass>,
pub current_machine: bool,
pub presence_path: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub first_seen_epoch_s: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_heartbeat_at_epoch_s: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub lease_ttl_secs: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub lease_expires_at_epoch_s: Option<u64>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub available_profiles: Vec<String>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub capabilities: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub runtime_health: Option<MachineRuntimeHealth>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub reachable_projects: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
pub(crate) struct MachineExecutionContextView {
pub status: &'static str,
pub profile_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub project_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub pod_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub machine_id: Option<String>,
pub profile_available: bool,
pub project_reachable: bool,
pub machine_presence_active: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub current_session_stale: Option<bool>,
pub policy_gate: &'static str,
pub claim_transfer: &'static str,
pub workspace_state_shared: bool,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub blockers: Vec<String>,
pub required_remote_takeover_inputs: Vec<&'static str>,
}
#[derive(Debug, Clone, Serialize)]
pub(crate) struct MachineTakeoverPreconditionsView {
pub status: &'static str,
pub session_status: &'static str,
#[serde(skip_serializing_if = "Option::is_none")]
pub session_lifecycle: Option<session_state::SessionLifecycle>,
#[serde(skip_serializing_if = "Option::is_none")]
pub session_owner_kind: Option<session_state::SessionOwnerKind>,
#[serde(skip_serializing_if = "Option::is_none")]
pub session_actor_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub session_stale: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub session_ownership_conflict: Option<bool>,
pub local_execution_context_status: &'static str,
pub policy_gate_status: &'static str,
pub blocking_escalation_active: bool,
pub protected_writes_require_current_owner: bool,
pub extension_claim_transfer_required: bool,
pub workspace_state_shared: bool,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub blockers: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct MachinePresenceRecord {
schema_version: u32,
pod_name: String,
machine_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
display_name: Option<String>,
trust_class: pod_identity::MachineTrustClass,
first_seen_epoch_s: u64,
last_heartbeat_at_epoch_s: u64,
lease_ttl_secs: u64,
available_profiles: Vec<String>,
capabilities: Vec<String>,
runtime_health: MachineRuntimeHealth,
reachable_projects: Vec<String>,
}
#[derive(Debug, Clone)]
struct LoadedMachinePresenceRecord {
record: MachinePresenceRecord,
path: PathBuf,
}
#[derive(Debug, Clone)]
struct InvalidMachinePresenceRecord {
machine_id: String,
path: PathBuf,
reason: String,
}
#[derive(Debug, Clone)]
enum PresenceRecordEntry {
Loaded(LoadedMachinePresenceRecord),
Invalid(InvalidMachinePresenceRecord),
}
pub(crate) fn record_machine_presence(
layout: &StateLayout,
locality_id: Option<&str>,
runtime_health: MachineRuntimeHealth,
lease_ttl_secs: Option<u64>,
observed_at_epoch_s: u64,
) -> Result<()> {
let Some(pod) = pod_identity::resolve_active_identity(layout)? else {
return Ok(());
};
let Some(machine) = pod_identity::resolve_active_machine_identity(layout)? else {
return Ok(());
};
let path = layout.pod_machine_presence_path(&pod.name, &machine.id)?;
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)
.with_context(|| format!("failed to create {}", parent.display()))?;
}
let existing = load_presence_record(&path).ok().flatten();
let first_seen_epoch_s = existing
.as_ref()
.map(|loaded| loaded.record.first_seen_epoch_s)
.unwrap_or(observed_at_epoch_s);
let mut reachable_projects = existing
.as_ref()
.map(|loaded| loaded.record.reachable_projects.clone())
.unwrap_or_default();
if let Some(locality_id) = locality_id {
reachable_projects.push(locality_id.to_owned());
}
let reachable_projects = dedupe_sorted(reachable_projects);
let record = MachinePresenceRecord {
schema_version: MACHINE_PRESENCE_SCHEMA_VERSION,
pod_name: pod.name.clone(),
machine_id: machine.id.clone(),
display_name: machine.display_name.clone(),
trust_class: machine.trust_class,
first_seen_epoch_s,
last_heartbeat_at_epoch_s: observed_at_epoch_s,
lease_ttl_secs: effective_lease_ttl_secs(lease_ttl_secs, runtime_health),
available_profiles: dedupe_sorted(machine.available_profiles.clone()),
capabilities: dedupe_sorted(machine.capabilities.clone()),
runtime_health,
reachable_projects,
};
let payload = serde_json::to_string_pretty(&record).expect("machine presence serialization");
write::replace_text(&path, &payload, None)
.with_context(|| format!("failed to write {}", path.display()))?;
Ok(())
}
pub(crate) fn resolve_machine_presence_view(layout: &StateLayout) -> Result<MachinePresenceView> {
let now_epoch_s = session_state::now_epoch_s()?;
let active_pod = pod_identity::resolve_active_identity(layout)?;
let active_machine = pod_identity::resolve_active_machine_identity(layout)?;
let relay_root = active_pod
.as_ref()
.map(|pod| layout.pod_machine_presence_root(&pod.name))
.transpose()?
.map(|path| path.display().to_string());
let observed_records = match active_pod.as_ref() {
Some(pod) => load_presence_records(layout, &pod.name, now_epoch_s)?,
None => Vec::new(),
};
let observed_machines = build_observed_machine_views(
&observed_records,
active_machine.as_ref().map(|machine| machine.id.as_str()),
now_epoch_s,
);
let Some(pod) = active_pod else {
return Ok(MachinePresenceView {
status: "missing",
transport: None,
relay_root: None,
pod_name: None,
machine_id: None,
display_name: None,
trust_class: None,
presence_path: None,
first_seen_epoch_s: None,
last_heartbeat_at_epoch_s: None,
lease_ttl_secs: None,
lease_expires_at_epoch_s: None,
available_profiles: Vec::new(),
capabilities: Vec::new(),
runtime_health: None,
reachable_projects: Vec::new(),
observed_machines,
reason: Some(format!(
"no active pod identity is declared for profile `{}`",
layout.profile()
)),
});
};
let Some(machine) = active_machine else {
return Ok(MachinePresenceView {
status: "missing",
transport: Some(FILESYSTEM_RELAY_TRANSPORT),
relay_root,
pod_name: Some(pod.name.clone()),
machine_id: None,
display_name: None,
trust_class: None,
presence_path: None,
first_seen_epoch_s: None,
last_heartbeat_at_epoch_s: None,
lease_ttl_secs: None,
lease_expires_at_epoch_s: None,
available_profiles: Vec::new(),
capabilities: Vec::new(),
runtime_health: None,
reachable_projects: Vec::new(),
observed_machines,
reason: Some(format!(
"no machine identity is declared yet for pod `{}`",
pod.name
)),
});
};
let default_path = layout
.pod_machine_presence_path(&pod.name, &machine.id)?
.display()
.to_string();
let current = observed_records.into_iter().find_map(|entry| match entry {
PresenceRecordEntry::Loaded(record) if record.record.machine_id == machine.id => {
Some(Ok(record))
}
PresenceRecordEntry::Invalid(record) if record.machine_id == machine.id => {
Some(Err(record))
}
_ => None,
});
match current {
Some(Ok(record)) => {
let lease_expires_at_epoch_s = lease_expires_at_epoch_s(&record.record);
let status = if lease_expires_at_epoch_s <= now_epoch_s {
"expired"
} else {
"active"
};
Ok(MachinePresenceView {
status,
transport: Some(FILESYSTEM_RELAY_TRANSPORT),
relay_root,
pod_name: Some(record.record.pod_name),
machine_id: Some(record.record.machine_id),
display_name: record.record.display_name,
trust_class: Some(record.record.trust_class),
presence_path: Some(record.path.display().to_string()),
first_seen_epoch_s: Some(record.record.first_seen_epoch_s),
last_heartbeat_at_epoch_s: Some(record.record.last_heartbeat_at_epoch_s),
lease_ttl_secs: Some(record.record.lease_ttl_secs),
lease_expires_at_epoch_s: Some(lease_expires_at_epoch_s),
available_profiles: record.record.available_profiles,
capabilities: record.record.capabilities,
runtime_health: Some(record.record.runtime_health),
reachable_projects: record.record.reachable_projects,
observed_machines,
reason: None,
})
}
Some(Err(record)) => Ok(MachinePresenceView {
status: "invalid",
transport: Some(FILESYSTEM_RELAY_TRANSPORT),
relay_root,
pod_name: Some(pod.name),
machine_id: Some(machine.id),
display_name: machine.display_name,
trust_class: Some(machine.trust_class),
presence_path: Some(record.path.display().to_string()),
first_seen_epoch_s: None,
last_heartbeat_at_epoch_s: None,
lease_ttl_secs: None,
lease_expires_at_epoch_s: None,
available_profiles: machine.available_profiles,
capabilities: machine.capabilities,
runtime_health: None,
reachable_projects: Vec::new(),
observed_machines,
reason: Some(record.reason),
}),
None => Ok(MachinePresenceView {
status: "missing",
transport: Some(FILESYSTEM_RELAY_TRANSPORT),
relay_root,
pod_name: Some(pod.name),
machine_id: Some(machine.id.clone()),
display_name: machine.display_name,
trust_class: Some(machine.trust_class),
presence_path: Some(default_path),
first_seen_epoch_s: None,
last_heartbeat_at_epoch_s: None,
lease_ttl_secs: None,
lease_expires_at_epoch_s: None,
available_profiles: machine.available_profiles,
capabilities: machine.capabilities,
runtime_health: None,
reachable_projects: Vec::new(),
observed_machines,
reason: Some(format!(
"no leased machine presence record exists yet for machine `{}`",
machine.id
)),
}),
}
}
pub(crate) fn build_execution_context_view(
layout: &StateLayout,
locality_id: Option<&str>,
machine_identity: &pod_identity::MachineIdentityView,
machine_presence: &MachinePresenceView,
session: Option<&session_state::SessionLifecycleProjection>,
) -> MachineExecutionContextView {
let profile_id = layout.profile().to_string();
let profile_available = machine_identity
.available_profiles
.iter()
.any(|profile| profile == &profile_id);
let project_reachable = match locality_id {
Some(locality_id) => machine_presence
.reachable_projects
.iter()
.any(|project| project == locality_id),
None => true,
};
let machine_presence_active = machine_presence.status == "active";
let current_session_stale = session.and_then(|session| session.stale);
let mut blockers = Vec::new();
match machine_identity.status {
"declared" => {}
"missing" => blockers.push(
"no machine identity is declared for the active pod, so this runtime cannot advertise a takeover-safe execution context".to_owned(),
),
_ => blockers.push("machine identity is not in a usable state".to_owned()),
}
if !profile_available {
blockers.push(format!(
"machine `{}` does not declare profile `{profile_id}` as available",
machine_identity.id.as_deref().unwrap_or("unknown")
));
}
if !project_reachable {
blockers.push(
"the current project is not advertised in the active machine presence lease, so this runtime cannot prove local execution reachability".to_owned(),
);
}
if !machine_presence_active {
blockers.push(
"the local machine presence lease is not active, so schedulers should not treat this runtime as eligible for new work".to_owned(),
);
}
MachineExecutionContextView {
status: if blockers.is_empty() {
"eligible"
} else if machine_identity.status == "missing" {
"unavailable"
} else {
"blocked"
},
profile_id,
project_id: locality_id.map(str::to_owned),
pod_name: machine_presence
.pod_name
.clone()
.or_else(|| machine_identity.pod_name.clone()),
machine_id: machine_identity
.id
.clone()
.or_else(|| machine_presence.machine_id.clone()),
profile_available,
project_reachable,
machine_presence_active,
current_session_stale,
policy_gate: "check policy_projection before autonomous takeover or protected writes",
claim_transfer: "extension-owned claim transfer or supersession is still required",
workspace_state_shared: false,
blockers,
required_remote_takeover_inputs: vec![
"stale_session",
"policy_projection_allows_new_actor",
"local_execution_context",
"extension_claim_transfer",
],
}
}
pub(crate) fn build_takeover_preconditions_view(
execution_context: &MachineExecutionContextView,
session: Option<&session_state::SessionLifecycleProjection>,
blocking_escalation_active: bool,
blocking_escalation_count: usize,
protected_writes_require_current_owner: bool,
) -> MachineTakeoverPreconditionsView {
let session_status = match session {
Some(session) if session.lifecycle.is_none() => "missing",
Some(session) if session.stale == Some(true) => "stale",
Some(_) => "active",
None => "missing",
};
let policy_gate_status = if blocking_escalation_active {
"blocked"
} else {
"review_required"
};
let mut blockers = Vec::new();
match session {
None => blockers.push(
"no session lifecycle is active in this workspace, so cross-machine takeover does not apply yet".to_owned(),
),
Some(session) if session.lifecycle.is_none() => blockers.push(
"no session lifecycle is active in this workspace, so cross-machine takeover does not apply yet".to_owned(),
),
Some(session)
if session.lifecycle != Some(session_state::SessionLifecycle::Autonomous) =>
{
blockers.push(
"the current session is not autonomous, so stale-worker takeover must stay blocked".to_owned(),
);
}
Some(session) if session.stale != Some(true) => blockers.push(
"the current autonomous session lease is not stale, so ownership must remain with the active machine".to_owned(),
),
_ => {}
}
if let Some(session) = session {
if session.ownership_conflict == Some(true) {
blockers.push(
"the candidate actor does not match the active owner, so takeover must wait for stale-session handling or explicit clear".to_owned(),
);
}
}
blockers.extend(execution_context.blockers.iter().cloned());
if blocking_escalation_active {
blockers.push(format!(
"{blocking_escalation_count} blocking escalation(s) are active, so policy must be resolved before takeover can proceed"
));
}
MachineTakeoverPreconditionsView {
status: match session {
None => "unavailable",
Some(session) if session.lifecycle.is_none() => "unavailable",
_ if blockers.is_empty() => "eligible",
_ => "blocked",
},
session_status,
session_lifecycle: session.and_then(|session| session.lifecycle),
session_owner_kind: session.and_then(|session| session.owner_kind),
session_actor_id: session.and_then(|session| session.actor_id.clone()),
session_stale: session.and_then(|session| session.stale),
session_ownership_conflict: session.and_then(|session| session.ownership_conflict),
local_execution_context_status: execution_context.status,
policy_gate_status,
blocking_escalation_active,
protected_writes_require_current_owner,
extension_claim_transfer_required: true,
workspace_state_shared: execution_context.workspace_state_shared,
blockers,
}
}
fn effective_lease_ttl_secs(
lease_ttl_secs: Option<u64>,
runtime_health: MachineRuntimeHealth,
) -> u64 {
lease_ttl_secs.unwrap_or(match runtime_health {
MachineRuntimeHealth::Ready | MachineRuntimeHealth::Degraded => DEFAULT_READY_LEASE_SECS,
MachineRuntimeHealth::Idle => DEFAULT_IDLE_LEASE_SECS,
})
}
fn load_presence_records(
layout: &StateLayout,
pod_name: &str,
now_epoch_s: u64,
) -> Result<Vec<PresenceRecordEntry>> {
let root = layout.pod_machine_presence_root(pod_name)?;
let Ok(entries) = fs::read_dir(&root) else {
return Ok(Vec::new());
};
let mut records = Vec::new();
for entry in entries {
let entry = entry?;
if !entry.file_type()?.is_file() {
continue;
}
if entry.path().extension().and_then(|value| value.to_str()) != Some("json") {
continue;
}
records.push(match load_presence_record(&entry.path()) {
Ok(Some(record)) => PresenceRecordEntry::Loaded(record),
Ok(None) => continue,
Err(error) => PresenceRecordEntry::Invalid(InvalidMachinePresenceRecord {
machine_id: entry
.path()
.file_stem()
.and_then(|value| value.to_str())
.unwrap_or("unknown")
.to_owned(),
path: entry.path(),
reason: format!(
"failed to parse machine presence at {}: {error:#}",
entry.path().display()
),
}),
});
}
records.sort_by(|left, right| {
let left_key = presence_sort_key(left, now_epoch_s);
let right_key = presence_sort_key(right, now_epoch_s);
left_key.cmp(&right_key)
});
Ok(records)
}
fn load_presence_record(path: &Path) -> Result<Option<LoadedMachinePresenceRecord>> {
let raw = match fs::read_to_string(path) {
Ok(raw) => raw,
Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(error) => {
return Err(error).with_context(|| format!("failed to read {}", path.display()))
}
};
if raw.trim().is_empty() {
return Ok(None);
}
let record: MachinePresenceRecord = serde_json::from_str(&raw)
.with_context(|| format!("failed to parse {}", path.display()))?;
if record.schema_version != MACHINE_PRESENCE_SCHEMA_VERSION {
bail!(
"unsupported machine presence schema version {} in {}; expected {}",
record.schema_version,
path.display(),
MACHINE_PRESENCE_SCHEMA_VERSION
);
}
Ok(Some(LoadedMachinePresenceRecord {
record,
path: path.to_path_buf(),
}))
}
fn build_observed_machine_views(
records: &[PresenceRecordEntry],
current_machine_id: Option<&str>,
now_epoch_s: u64,
) -> Vec<MachinePresencePeerView> {
records
.iter()
.map(|record| match record {
PresenceRecordEntry::Loaded(record) => {
let lease_expires_at_epoch_s = lease_expires_at_epoch_s(&record.record);
MachinePresencePeerView {
status: if lease_expires_at_epoch_s <= now_epoch_s {
"expired"
} else {
"active"
},
machine_id: record.record.machine_id.clone(),
display_name: record.record.display_name.clone(),
trust_class: Some(record.record.trust_class),
current_machine: current_machine_id
.is_some_and(|machine_id| machine_id == record.record.machine_id),
presence_path: record.path.display().to_string(),
first_seen_epoch_s: Some(record.record.first_seen_epoch_s),
last_heartbeat_at_epoch_s: Some(record.record.last_heartbeat_at_epoch_s),
lease_ttl_secs: Some(record.record.lease_ttl_secs),
lease_expires_at_epoch_s: Some(lease_expires_at_epoch_s),
available_profiles: record.record.available_profiles.clone(),
capabilities: record.record.capabilities.clone(),
runtime_health: Some(record.record.runtime_health),
reachable_projects: record.record.reachable_projects.clone(),
reason: None,
}
}
PresenceRecordEntry::Invalid(record) => MachinePresencePeerView {
status: "invalid",
machine_id: record.machine_id.clone(),
display_name: None,
trust_class: None,
current_machine: current_machine_id
.is_some_and(|machine_id| machine_id == record.machine_id),
presence_path: record.path.display().to_string(),
first_seen_epoch_s: None,
last_heartbeat_at_epoch_s: None,
lease_ttl_secs: None,
lease_expires_at_epoch_s: None,
available_profiles: Vec::new(),
capabilities: Vec::new(),
runtime_health: None,
reachable_projects: Vec::new(),
reason: Some(record.reason.clone()),
},
})
.collect()
}
fn lease_expires_at_epoch_s(record: &MachinePresenceRecord) -> u64 {
record
.last_heartbeat_at_epoch_s
.saturating_add(record.lease_ttl_secs)
}
fn dedupe_sorted(values: Vec<String>) -> Vec<String> {
values
.into_iter()
.filter(|value| !value.is_empty())
.collect::<BTreeSet<_>>()
.into_iter()
.collect()
}
fn presence_sort_key(record: &PresenceRecordEntry, now_epoch_s: u64) -> (u8, String) {
match record {
PresenceRecordEntry::Loaded(record) => (
if lease_expires_at_epoch_s(&record.record) <= now_epoch_s {
1
} else {
0
},
record.record.machine_id.clone(),
),
PresenceRecordEntry::Invalid(record) => (2, record.machine_id.clone()),
}
}