vs_daemon/daemon/
pending.rs1use std::collections::HashMap;
15use std::sync::{Arc, Condvar, Mutex};
16use std::time::{Duration, Instant};
17
18#[derive(Debug, Clone)]
23pub struct PendingEntry {
24 pub id: String,
25 pub page: String,
26 pub r: u32,
27 pub message: String,
28 pub secret: bool,
29 pub token: String,
30 pub group: Option<String>,
31 pub created_at: Instant,
32}
33
34#[derive(Debug, Clone)]
36pub enum FulfillState {
37 Pending,
38 Fulfilled(String),
39 Cancelled,
40}
41
42#[derive(Default)]
46pub struct PendingQueue {
47 inner: Mutex<HashMap<String, (PendingEntry, FulfillState)>>,
48 cv: Condvar,
49}
50
51impl PendingQueue {
52 #[must_use]
53 pub fn new() -> Arc<Self> {
54 Arc::new(Self::default())
55 }
56
57 #[must_use]
61 pub fn enqueue_and_wait(&self, entry: PendingEntry, timeout: Duration) -> Option<String> {
62 let id = entry.id.clone();
63 {
64 let mut guard = self.inner.lock().unwrap();
65 guard.insert(id.clone(), (entry, FulfillState::Pending));
66 }
67 let deadline = Instant::now() + timeout;
68 let mut guard = self.inner.lock().unwrap();
69 loop {
70 let remaining = match deadline.checked_duration_since(Instant::now()) {
71 Some(r) if !r.is_zero() => r,
72 _ => {
73 guard.remove(&id);
74 return None;
75 }
76 };
77 let (g, _) = self.cv.wait_timeout(guard, remaining).unwrap();
78 guard = g;
79 let (_, state) = guard.get(&id)?;
80 match state.clone() {
81 FulfillState::Pending => {}
82 FulfillState::Fulfilled(v) => {
83 guard.remove(&id);
84 return Some(v);
85 }
86 FulfillState::Cancelled => {
87 guard.remove(&id);
88 return None;
89 }
90 }
91 }
92 }
93
94 #[must_use]
96 pub fn list(&self) -> Vec<PendingEntry> {
97 let guard = self.inner.lock().unwrap();
98 guard
99 .values()
100 .filter(|(_, s)| matches!(s, FulfillState::Pending))
101 .map(|(e, _)| e.clone())
102 .collect()
103 }
104
105 pub fn fulfill(&self, id: &str, value: String) -> bool {
107 let mut guard = self.inner.lock().unwrap();
108 if let Some((_, state)) = guard.get_mut(id) {
109 if matches!(state, FulfillState::Pending) {
110 *state = FulfillState::Fulfilled(value);
111 self.cv.notify_all();
112 return true;
113 }
114 }
115 false
116 }
117
118 pub fn cancel(&self, id: &str) -> bool {
120 let mut guard = self.inner.lock().unwrap();
121 if let Some((_, state)) = guard.get_mut(id) {
122 if matches!(state, FulfillState::Pending) {
123 *state = FulfillState::Cancelled;
124 self.cv.notify_all();
125 return true;
126 }
127 }
128 false
129 }
130
131 #[must_use]
133 pub fn peek(&self, id: &str) -> Option<PendingEntry> {
134 let guard = self.inner.lock().unwrap();
135 guard
136 .get(id)
137 .filter(|(_, s)| matches!(s, FulfillState::Pending))
138 .map(|(e, _)| e.clone())
139 }
140}
141
142use std::sync::atomic::{AtomicU64, Ordering};
143use std::time::SystemTime;
144
145static COUNTER: AtomicU64 = AtomicU64::new(0);
146
147#[must_use]
149pub fn new_id() -> String {
150 let nanos = SystemTime::now()
151 .duration_since(SystemTime::UNIX_EPOCH)
152 .map_or(0_u128, |d| d.as_nanos());
153 let counter = COUNTER.fetch_add(1, Ordering::Relaxed);
154 #[allow(clippy::cast_possible_truncation)]
155 let n = nanos as u64;
156 let combined = n.wrapping_mul(0x9E37_79B9_7F4A_7C15) ^ counter;
157 format!("p_{combined:016x}")
158}