datafusion-dist 0.1.0

A distributed streaming execution library for Apache DataFusion
Documentation
use std::{collections::HashMap, sync::Arc, time::Duration};

use log::error;
use tokio::sync::Mutex;

use crate::{
    cluster::{DistCluster, NodeId, NodeState},
    planner::StageId,
    runtime::StageState,
};

#[derive(Debug, Clone)]
pub struct Heartbeater {
    pub node_id: NodeId,
    pub cluster: Arc<dyn DistCluster>,
    pub stages: Arc<Mutex<HashMap<StageId, Arc<StageState>>>>,
    pub heartbeat_interval: Duration,
}

impl Heartbeater {
    pub fn new(
        node_id: NodeId,
        cluster: Arc<dyn DistCluster>,
        stages: Arc<Mutex<HashMap<StageId, Arc<StageState>>>>,
        heartbeat_interval: Duration,
    ) -> Self {
        Heartbeater {
            node_id,
            cluster,
            stages,
            heartbeat_interval,
        }
    }

    pub fn start(&self) {
        let node_id = self.node_id.clone();
        let cluster = self.cluster.clone();
        let stages = self.stages.clone();
        let heartbeat_interval = self.heartbeat_interval;

        tokio::spawn(async move {
            let mut sys = sysinfo::System::new();

            loop {
                let guard = stages.lock().await;
                let mut num_running_tasks = 0;
                for (_, state) in guard.iter() {
                    num_running_tasks += state.num_running_tasks().await;
                }
                drop(guard);

                sys.refresh_memory();
                sys.refresh_cpu_usage();

                let node_state = NodeState {
                    total_memory: sys.total_memory(),
                    used_memory: sys.used_memory(),
                    free_memory: sys.free_memory(),
                    available_memory: sys.available_memory(),
                    global_cpu_usage: sys.global_cpu_usage(),
                    num_running_tasks: num_running_tasks as u32,
                };
                if let Err(e) = cluster.heartbeat(node_id.clone(), node_state).await {
                    error!("Failed to send heartbeat: {e}");
                }

                tokio::time::sleep(heartbeat_interval).await;
            }
        });
    }
}