ostool-server 0.2.1

Server for managing development boards, serial sessions, and TFTP artifacts
use std::sync::{
    Arc,
    atomic::{AtomicBool, AtomicU8, Ordering},
};

use chrono::{DateTime, Duration, Utc};
use serde::{Deserialize, Serialize};
use tokio::sync::{RwLock, mpsc, watch};

use crate::{config::BoardConfig, state::AppState};

pub const SESSION_TTL: Duration = Duration::seconds(2);

const SESSION_STATE_ACTIVE: u8 = 0;
const SESSION_STATE_RELEASING: u8 = 1;

#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum SessionLifecycleState {
    Active,
    Releasing,
}

fn default_session_state() -> SessionLifecycleState {
    SessionLifecycleState::Active
}

impl SessionLifecycleState {
    fn as_u8(self) -> u8 {
        match self {
            Self::Active => SESSION_STATE_ACTIVE,
            Self::Releasing => SESSION_STATE_RELEASING,
        }
    }

    fn from_u8(value: u8) -> Self {
        match value {
            SESSION_STATE_RELEASING => Self::Releasing,
            _ => Self::Active,
        }
    }
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SessionStopReason {
    ApiDelete,
    SerialClosed,
    Expired,
    Dropped,
}

#[derive(Debug)]
enum SessionCommand {
    Stop(SessionStopReason),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Session {
    pub id: String,
    pub board_id: String,
    pub client_name: Option<String>,
    pub created_at: DateTime<Utc>,
    pub last_heartbeat_at: DateTime<Utc>,
    pub expires_at: DateTime<Utc>,
    #[serde(default)]
    pub serial_connected: bool,
    #[serde(default = "default_session_state")]
    pub state: SessionLifecycleState,
}

impl Session {
    pub fn new(board_id: String, client_name: Option<String>) -> Self {
        Self::new_with_id(uuid::Uuid::new_v4().to_string(), board_id, client_name)
    }

    pub fn new_with_id(id: String, board_id: String, client_name: Option<String>) -> Self {
        let now = Utc::now();
        Self {
            id,
            board_id,
            client_name,
            created_at: now,
            last_heartbeat_at: now,
            expires_at: now + SESSION_TTL,
            serial_connected: false,
            state: SessionLifecycleState::Active,
        }
    }

    pub fn touch(&mut self) {
        let now = Utc::now();
        self.last_heartbeat_at = now;
        self.expires_at = now + SESSION_TTL;
    }
}

#[derive(Debug)]
pub struct SessionState {
    info: RwLock<Session>,
    board: BoardConfig,
    shutdown_tx: watch::Sender<bool>,
    lifecycle_state: AtomicU8,
    stop_requested: AtomicBool,
    serial_connected: AtomicBool,
    command_tx: Option<mpsc::UnboundedSender<SessionCommand>>,
}

impl SessionState {
    pub fn new(board: BoardConfig, client_name: Option<String>) -> Arc<Self> {
        Self::new_inner(uuid::Uuid::new_v4().to_string(), board, client_name, None)
    }

    pub fn new_with_actor(
        session_id: String,
        board: BoardConfig,
        client_name: Option<String>,
        app_state: AppState,
    ) -> Arc<Self> {
        let (command_tx, command_rx) = mpsc::unbounded_channel();
        let session = Self::new_inner(session_id, board, client_name, Some(command_tx));
        tokio::spawn(run_session_actor(app_state, session.clone(), command_rx));
        session
    }

    fn new_inner(
        session_id: String,
        board: BoardConfig,
        client_name: Option<String>,
        command_tx: Option<mpsc::UnboundedSender<SessionCommand>>,
    ) -> Arc<Self> {
        let (shutdown_tx, _shutdown_rx) = watch::channel(false);
        Arc::new(Self {
            info: RwLock::new(Session::new_with_id(
                session_id,
                board.id.clone(),
                client_name,
            )),
            board,
            shutdown_tx,
            lifecycle_state: AtomicU8::new(SessionLifecycleState::Active.as_u8()),
            stop_requested: AtomicBool::new(false),
            serial_connected: AtomicBool::new(false),
            command_tx,
        })
    }

    pub fn board(&self) -> &BoardConfig {
        &self.board
    }

    pub fn lifecycle_state(&self) -> SessionLifecycleState {
        SessionLifecycleState::from_u8(self.lifecycle_state.load(Ordering::Acquire))
    }

    pub async fn snapshot(&self) -> Session {
        let mut info = self.info.read().await.clone();
        info.serial_connected = self.serial_connected.load(Ordering::Acquire);
        info.state = self.lifecycle_state();
        info
    }

    pub async fn heartbeat(&self) -> Session {
        let mut info = self.info.write().await;
        info.touch();
        info.serial_connected = self.serial_connected.load(Ordering::Acquire);
        info.state = self.lifecycle_state();
        info.clone()
    }

    pub fn begin_release(&self) -> bool {
        self.lifecycle_state
            .compare_exchange(
                SessionLifecycleState::Active.as_u8(),
                SessionLifecycleState::Releasing.as_u8(),
                Ordering::AcqRel,
                Ordering::Acquire,
            )
            .is_ok()
    }

    pub fn is_releasing(&self) -> bool {
        self.lifecycle_state() == SessionLifecycleState::Releasing
    }

    pub fn request_stop(&self, reason: SessionStopReason) {
        if self.is_releasing() {
            return;
        }

        if self
            .stop_requested
            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
            .is_err()
        {
            return;
        }

        if let Some(command_tx) = &self.command_tx {
            let _ = command_tx.send(SessionCommand::Stop(reason));
        }
    }

    pub fn subscribe_shutdown(&self) -> watch::Receiver<bool> {
        self.shutdown_tx.subscribe()
    }

    pub fn signal_shutdown(&self) {
        let _ = self.shutdown_tx.send(true);
    }

    pub fn try_set_serial_connected(&self) -> bool {
        self.serial_connected
            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
            .is_ok()
    }

    pub fn clear_serial_connected(&self) {
        self.serial_connected.store(false, Ordering::Release);
    }

    pub fn set_serial_connected(&self, connected: bool) {
        self.serial_connected.store(connected, Ordering::Release);
    }

    pub fn is_serial_connected(&self) -> bool {
        self.serial_connected.load(Ordering::Acquire)
    }
}

impl Drop for SessionState {
    fn drop(&mut self) {
        if self.lifecycle_state() != SessionLifecycleState::Active {
            return;
        }

        if let Some(command_tx) = &self.command_tx {
            let _ = command_tx.send(SessionCommand::Stop(SessionStopReason::Dropped));
        }
    }
}

async fn run_session_actor(
    app_state: AppState,
    session: Arc<SessionState>,
    mut command_rx: mpsc::UnboundedReceiver<SessionCommand>,
) {
    if let Some(SessionCommand::Stop(reason)) = command_rx.recv().await {
        if !session.begin_release() {
            return;
        }

        session.signal_shutdown();
        let snapshot = session.snapshot().await;

        if let Err(err) = app_state
            .transition_board_to_releasing(&snapshot.board_id, &snapshot.id)
            .await
        {
            log::warn!(
                "failed to mark board `{}` releasing for session `{}`: {err}",
                snapshot.board_id,
                snapshot.id
            );
        }

        if let Err(err) = app_state.enqueue_release(session.clone(), reason) {
            log::warn!(
                "failed to enqueue release job for session `{}`: {err}",
                snapshot.id
            );
        }
    }
}

#[cfg(test)]
mod tests {
    use std::thread;

    use super::{SESSION_TTL, Session, SessionLifecycleState, SessionState};
    use crate::config::{
        BoardConfig, BootConfig, CustomPowerManagement, PowerManagementConfig, PxeProfile,
    };

    fn sample_board() -> BoardConfig {
        BoardConfig {
            id: "demo".into(),
            board_type: "demo".into(),
            tags: vec![],
            serial: None,
            power_management: PowerManagementConfig::Custom(CustomPowerManagement {
                power_on_cmd: "echo on".into(),
                power_off_cmd: "echo off".into(),
            }),
            boot: BootConfig::Pxe(PxeProfile::default()),
            notes: None,
            disabled: false,
        }
    }

    #[test]
    fn session_new_uses_fixed_ttl() {
        let session = Session::new("demo".into(), Some("client".into()));
        assert_eq!(session.expires_at - session.created_at, SESSION_TTL);
        assert_eq!(session.last_heartbeat_at, session.created_at);
        assert_eq!(session.state, SessionLifecycleState::Active);
    }

    #[tokio::test]
    async fn session_state_heartbeat_updates_expiry() {
        let state = SessionState::new(sample_board(), Some("client".into()));
        let first = state.snapshot().await;
        thread::sleep(std::time::Duration::from_millis(10));
        let updated = state.heartbeat().await;
        assert!(updated.last_heartbeat_at > first.last_heartbeat_at);
        assert!(updated.expires_at > first.expires_at);
    }

    #[test]
    fn session_state_release_is_idempotent() {
        let state = SessionState::new(sample_board(), None);
        assert!(state.begin_release());
        assert!(!state.begin_release());
        assert!(state.is_releasing());
    }
}