datafusion-dist 0.3.0

A distributed streaming execution library for Apache DataFusion
Documentation
use std::{collections::HashMap, sync::Arc, time::Duration};

use log::{debug, error};
use parking_lot::Mutex;

use crate::{
    cluster::{DistCluster, NodeId, NodeState, NodeStatus},
    planner::StageId,
    runtime::StageState,
};

#[derive(Debug, Clone)]
pub struct Heartbeater {
    pub node_id: NodeId,
    pub cluster: Arc<dyn DistCluster>,
    pub stages: Arc<Mutex<HashMap<StageId, StageState>>>,
    pub heartbeat_interval: Duration,
    pub status: Arc<Mutex<NodeStatus>>,
}

impl Heartbeater {
    pub async fn send_heartbeat(&self) {
        let mut num_running_tasks = 0;
        {
            let guard = self.stages.lock();
            for (_, state) in guard.iter() {
                num_running_tasks += state.num_running_tasks();
            }
            drop(guard);
        }

        let mut sys = sysinfo::System::new();
        sys.refresh_memory();
        sys.refresh_cpu_usage();

        let node_state = NodeState {
            status: *self.status.lock(),
            total_memory: sys.total_memory(),
            used_memory: sys.used_memory(),
            free_memory: sys.free_memory(),
            available_memory: sys.available_memory(),
            global_cpu_usage: sys.global_cpu_usage(),
            num_running_tasks: num_running_tasks as u32,
        };
        match self
            .cluster
            .heartbeat(self.node_id.clone(), node_state)
            .await
        {
            Ok(_) => {
                debug!("Heartbeat sent successfully");
            }
            Err(e) => {
                error!("Failed to send heartbeat: {e:?}");
            }
        }
    }

    pub fn start(&self) {
        let heartbeater = self.clone();
        let heartbeat_interval = self.heartbeat_interval;

        tokio::spawn(async move {
            loop {
                heartbeater.send_heartbeat().await;
                tokio::time::sleep(heartbeat_interval).await;
            }
        });
    }
}