use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, Result};
use chrono::Utc;
use tracing;
use aegis_orchestrator_core::domain::cluster::NodeId;
use aegis_orchestrator_core::infrastructure::aegis_cluster_proto::{
node_command::Command, NodeCapabilities, NodeCommand,
};
use aegis_orchestrator_core::infrastructure::cluster::NodeClusterClient;
pub struct WorkerLifecycle {
client: NodeClusterClient,
node_id: NodeId,
role: i32,
capabilities: NodeCapabilities,
grpc_address: String,
heartbeat_interval: Duration,
token_refresh_margin: Duration,
signing_key: Arc<ed25519_dalek::SigningKey>,
}
impl WorkerLifecycle {
#[allow(clippy::too_many_arguments)]
pub fn new(
client: NodeClusterClient,
node_id: NodeId,
role: i32,
capabilities: NodeCapabilities,
grpc_address: String,
heartbeat_interval: Duration,
token_refresh_margin: Duration,
signing_key: Arc<ed25519_dalek::SigningKey>,
) -> Self {
Self {
client,
node_id,
role,
capabilities,
grpc_address,
heartbeat_interval,
token_refresh_margin,
signing_key,
}
}
pub async fn run(mut self, mut shutdown: tokio::sync::watch::Receiver<bool>) -> Result<()> {
tracing::info!("Worker connecting to cluster controller");
self.client
.connect()
.await
.context("Failed to connect to cluster controller")?;
let public_key = self.signing_key.verifying_key().to_bytes().to_vec();
tracing::info!(node_id = %self.node_id, "Worker performing attestation handshake");
self.client
.attest_and_challenge(
self.role,
public_key,
self.capabilities.clone(),
self.grpc_address.clone(),
)
.await
.context("Attestation handshake failed")?;
tracing::info!(node_id = %self.node_id, "Worker attestation succeeded");
tracing::info!(node_id = %self.node_id, "Worker registering with controller");
let cluster_id = self
.client
.register(self.capabilities.clone(), self.grpc_address.clone())
.await
.context("RegisterNode RPC failed")?;
tracing::info!(
node_id = %self.node_id,
cluster_id = %cluster_id,
"Worker registered successfully"
);
let mut interval = tokio::time::interval(self.heartbeat_interval);
loop {
tokio::select! {
_ = interval.tick() => {
if let Some(expires_at) = self.client.token_expires_at().await {
let margin = chrono::Duration::from_std(self.token_refresh_margin)
.unwrap_or_else(|_| chrono::Duration::minutes(5));
let refresh_at = expires_at - margin;
if Utc::now() >= refresh_at {
tracing::info!(
node_id = %self.node_id,
expires_at = %expires_at,
"Token approaching expiry, re-attesting"
);
let public_key = self.signing_key.verifying_key().to_bytes().to_vec();
match self.client.attest_and_challenge(
self.role,
public_key,
self.capabilities.clone(),
self.grpc_address.clone(),
).await {
Ok(_) => tracing::info!(node_id = %self.node_id, "Token refreshed successfully"),
Err(e) => tracing::warn!(
error = %e,
node_id = %self.node_id,
"Token refresh failed, will retry on next interval"
),
}
}
}
match self.client.heartbeat(0.0, 0).await {
Ok(commands) => {
for cmd in commands {
self.process_command(cmd).await;
}
}
Err(e) => {
tracing::warn!(
error = %e,
node_id = %self.node_id,
"Heartbeat failed, will retry on next interval"
);
}
}
}
_ = shutdown.changed() => {
tracing::info!(
node_id = %self.node_id,
"Worker lifecycle received shutdown signal"
);
break;
}
}
}
tracing::info!(node_id = %self.node_id, "Worker deregistering from cluster");
if let Err(e) = self
.client
.deregister("graceful shutdown".to_string())
.await
{
tracing::warn!(
error = %e,
node_id = %self.node_id,
"Failed to deregister from cluster"
);
}
Ok(())
}
async fn process_command(&mut self, command: NodeCommand) {
match command.command {
Some(Command::Drain(drain_cmd)) => {
tracing::info!(
node_id = %self.node_id,
drain = drain_cmd.drain,
"Received Drain command from controller"
);
}
Some(Command::PushConfig(config_cmd)) => {
tracing::info!(
node_id = %self.node_id,
config_version = %config_cmd.config_version,
"Received PushConfig command from controller"
);
}
Some(Command::Shutdown(shutdown_cmd)) => {
tracing::info!(
node_id = %self.node_id,
reason = %shutdown_cmd.reason,
"Received Shutdown command from controller"
);
}
None => {
tracing::debug!(
node_id = %self.node_id,
"Received empty NodeCommand (no inner command set)"
);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use aegis_orchestrator_core::domain::cluster::NodeId;
use aegis_orchestrator_core::infrastructure::cluster::NodeClusterClient;
use ed25519_dalek::SigningKey;
use std::time::Duration;
use uuid::Uuid;
#[test]
fn token_refresh_margin_is_stored_and_accessible() {
let signing_key = Arc::new(SigningKey::generate(&mut rand::rngs::OsRng));
let node_id = NodeId(Uuid::new_v4());
let client = NodeClusterClient::new(
"http://localhost:50051".to_string(),
signing_key.clone(),
node_id,
);
let margin = Duration::from_secs(300);
let lifecycle = WorkerLifecycle::new(
client,
node_id,
1, NodeCapabilities::default(),
"127.0.0.1:50052".to_string(),
Duration::from_secs(30),
margin,
signing_key,
);
assert_eq!(lifecycle.token_refresh_margin, Duration::from_secs(300));
}
}