tag2upload_service_manager/
o2m_tracker.rs1
2use crate::prelude::*;
3
4use crate::o2m_support::OracleSeq;
5
6use std::sync::{Mutex, MutexGuard};
7
8#[derive(Debug, Default)]
9pub struct WorkerTracker {
10 state: Mutex<State>,
11}
12
13#[derive(Debug, Default)]
14struct State {
15 entries: Slab<WorkerReport>,
16}
17
18#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Deftly)]
19#[derive_deftly(UiMap)]
20pub struct WorkerReport {
21 #[deftly(ui(skip))] pub seq: OracleSeq,
23 pub ident: String,
24 pub protocol: Option<o2m_messages::ProtocolVersion>,
25 pub fidelity: Option<WorkerFidelity>,
26 pub last_contact: TimeT,
27 pub phase: WorkerPhase,
28 pub source: NoneIsEmpty<PackageName>,
29 pub version: NoneIsEmpty<VersionString>,
30 pub status: Option<JobStatus>,
31 pub info: Option<String>,
32}
33
34#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Deftly)]
35#[derive_deftly(UiDisplayEnum)]
36pub enum WorkerPhase {
37 Init,
38 Idle,
39 Selected,
40 Building,
41 Disconnected,
42}
43
44#[derive(Debug)]
45pub struct TrackedWorker {
46 tracker: Arc<WorkerTracker>,
47 lid: usize,
49}
50
51impl WorkerTracker {
52 pub fn new_worker(
53 self: Arc<Self>,
54 initial: WorkerReport,
55 ) -> TrackedWorker {
56 let mut state = self.lock();
57 state.expire();
58
59 let lid = state.entries.insert(initial);
60 let tracker = self.clone();
61 TrackedWorker { tracker, lid }
62 }
63
64 pub fn list_workers(&self) -> Vec<WorkerReport> {
65 let mut list = {
66 let mut state = self.lock();
67 state.expire();
68 state.entries.iter()
69 .map(|(_, r)| r.clone())
70 .collect_vec()
71 };
72 list.sort();
73 list
74 }
75
76 fn lock(&self) -> MutexGuard<'_, State> {
77 self.state.lock().unwrap_or_else(|e| e.into_inner())
78 }
79}
80
81impl WorkerReport {
82 fn should_expire(&self, cutoff: TimeT) -> bool {
83 match self.phase {
84 WorkerPhase::Disconnected => self.last_contact < cutoff,
85 _ => false,
86 }
87 }
88}
89
90impl TrackedWorker {
91 pub fn update(&mut self, f: impl FnOnce(&mut WorkerReport)) {
92 let mut state = self.tracker.lock();
93 if let Some(ent) = state.entries.get_mut(self.lid) {
94 f(ent)
95 }
96 }
97
98 pub fn become_untracked(&mut self) {
99 let mut state = self.tracker.lock();
100 state.entries.try_remove(self.lid);
101 self.lid = usize::MAX;
102 }
103}
104
105impl State {
106 fn expire(&mut self) {
107 let gl = globals();
108 let now = gl.now();
109 let config = &gl.config;
110
111 let Some(cutoff) = now
112 .checked_sub(
113 config.timeouts
114 .disconnected_worker_expire
115 .as_secs()
116 )
117 else { return };
118
119 self.entries.retain(|_, ent| ! ent.should_expire(cutoff));
120 }
121}
122
123impl Drop for TrackedWorker {
124 fn drop(&mut self) {
125 let now = globals().now();
126 let mut state = self.tracker.lock();
127 if let Some(ent) = state.entries.get_mut(self.lid) {
128 ent.last_contact = now;
129 ent.phase = WorkerPhase::Disconnected;
130 }
131 }
132}
133
134define_derive_deftly! {
135 export UpdateWorkerReport:
136
137 impl $ttype {
138 pub fn update_worker_report(&self, wr: &mut WorkerReport) {
139 match self { $(
140 #[allow(unused_variables)]
141 $vpat => {
142 $(
143 ${when fmeta(worker_report)}
144 wr.$fname = Some($fpatname.clone()).into();
145 )
146 },
147 ) }
148 }
149 }
150}
151pub use derive_deftly_template_UpdateWorkerReport;