tag2upload_service_manager/
o2m_tracker.rs

1
2use crate::prelude::*;
3
4use crate::o2m_support::OracleSeq;
5
6use std::sync::{Mutex, MutexGuard};
7
8#[derive(Debug, Default)]
9pub struct WorkerTracker {
10    state: Mutex<State>,
11}
12
13#[derive(Debug, Default)]
14struct State {
15    entries: Slab<WorkerReport>,
16}
17
18#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Deftly)]
19#[derive_deftly(UiMap)]
20pub struct WorkerReport {
21    #[deftly(ui(skip))] // Just the sort key
22    pub seq: OracleSeq,
23    pub ident: String,
24    pub protocol: Option<o2m_messages::ProtocolVersion>,
25    pub fidelity: Option<WorkerFidelity>,
26    pub last_contact: TimeT,
27    pub phase: WorkerPhase,
28    pub source: NoneIsEmpty<PackageName>,
29    pub version: NoneIsEmpty<VersionString>,
30    pub status: Option<JobStatus>,
31    pub info: Option<String>,
32}
33
34#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Deftly)]
35#[derive_deftly(UiDisplayEnum)]
36pub enum WorkerPhase {
37    Init,
38    Idle,
39    Selected,
40    Building,
41    Disconnected,
42}
43
44#[derive(Debug)]
45pub struct TrackedWorker {
46    tracker: Arc<WorkerTracker>,
47    /// Can be `usize::MAX` meaning no entry, this is no longer tracked
48    lid: usize,
49}
50
51impl WorkerTracker {
52    pub fn new_worker(
53        self: Arc<Self>,
54        initial: WorkerReport,
55    ) -> TrackedWorker {
56        let mut state = self.lock();
57        state.expire();
58
59        let lid = state.entries.insert(initial);
60        let tracker = self.clone();
61        TrackedWorker { tracker, lid }
62    }
63
64    pub fn list_workers(&self) -> Vec<WorkerReport> {
65        let mut list = {
66            let mut state = self.lock();
67            state.expire();
68            state.entries.iter()
69                .map(|(_, r)| r.clone())
70                .collect_vec()
71        };
72        list.sort();
73        list
74    }
75
76    fn lock(&self) -> MutexGuard<'_, State> {
77        self.state.lock().unwrap_or_else(|e| e.into_inner())
78    }
79}
80
81impl WorkerReport {
82    fn should_expire(&self, cutoff: TimeT) -> bool {
83        match self.phase {
84            WorkerPhase::Disconnected => self.last_contact < cutoff,
85            _ => false,
86        }
87    }
88}
89
90impl TrackedWorker {
91    pub fn update(&mut self, f: impl FnOnce(&mut WorkerReport)) {
92        let mut state = self.tracker.lock();
93        if let Some(ent) = state.entries.get_mut(self.lid) {
94            f(ent)
95        }
96    }
97
98    pub fn become_untracked(&mut self) {
99        let mut state = self.tracker.lock();
100        state.entries.try_remove(self.lid);
101        self.lid = usize::MAX;
102    }
103}
104
105impl State {
106    fn expire(&mut self) {
107        let gl = globals();
108        let now = gl.now();
109        let config = &gl.config;
110
111        let Some(cutoff) = now
112            .checked_sub(
113                config.timeouts
114                    .disconnected_worker_expire
115                    .as_secs()
116            )
117        else { return };
118
119        self.entries.retain(|_, ent| ! ent.should_expire(cutoff));
120    }
121}
122
123impl Drop for TrackedWorker {
124    fn drop(&mut self) {
125        let now = globals().now();
126        let mut state = self.tracker.lock();
127        if let Some(ent) = state.entries.get_mut(self.lid) {
128            ent.last_contact = now;
129            ent.phase = WorkerPhase::Disconnected;
130        }
131    }
132}
133
134define_derive_deftly! {
135    export UpdateWorkerReport:
136
137    impl $ttype {
138        pub fn update_worker_report(&self, wr: &mut WorkerReport) {
139            match self { $(
140                #[allow(unused_variables)]
141                $vpat => {
142                    $(
143                        ${when fmeta(worker_report)}
144                        wr.$fname = Some($fpatname.clone()).into();
145                    )
146                },
147            ) }
148        }
149    }
150}
151pub use derive_deftly_template_UpdateWorkerReport;