Skip to main content

shard_core/
ops.rs

1use parking_lot::RwLock;
2use std::collections::HashMap;
3use std::path::PathBuf;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6use tracing::{info, warn};
7
8#[derive(Clone, Copy, PartialEq, Eq, Hash, serde::Serialize)]
9pub enum OpKind {
10    Read,
11    Write,
12}
13
14#[derive(Clone)]
15pub struct OpQueue {
16    inner: Arc<RwLock<OpQueueInner>>,
17}
18
19struct OpQueueInner {
20    running: Vec<OpEntry>,
21    pending: Vec<OpEntry>,
22}
23
24#[derive(Clone)]
25struct OpEntry {
26    id: String,
27    kind: OpKind,
28    started: Instant,
29    description: String,
30}
31
32impl Default for OpQueue {
33    fn default() -> Self {
34        Self::new()
35    }
36}
37
38impl OpQueue {
39    pub fn new() -> Self {
40        Self {
41            inner: Arc::new(RwLock::new(OpQueueInner {
42                running: Vec::new(),
43                pending: Vec::new(),
44            })),
45        }
46    }
47
48    pub fn acquire(&self, kind: OpKind, description: String) -> OpGuard {
49        let id = uuid::Uuid::new_v4().to_string();
50        {
51            let mut inner = self.inner.write();
52            let has_write = inner.running.iter().any(|e| e.kind == OpKind::Write);
53            let has_read_write = if kind == OpKind::Write {
54                !inner.running.is_empty()
55            } else {
56                false
57            };
58            if has_write || has_read_write {
59                inner.pending.push(OpEntry {
60                    id: id.clone(),
61                    kind,
62                    started: Instant::now(),
63                    description: description.clone(),
64                });
65            } else {
66                inner.running.push(OpEntry {
67                    id: id.clone(),
68                    kind,
69                    started: Instant::now(),
70                    description: description.clone(),
71                });
72            }
73        }
74        OpGuard {
75            queue: self.inner.clone(),
76            id,
77            kind,
78        }
79    }
80
81    pub fn wait_for_turn(&self, guard: &OpGuard) {
82        loop {
83            let inner = self.inner.read();
84            let is_in_running = inner.running.iter().any(|e| e.id == guard.id);
85            if is_in_running {
86                return;
87            }
88            let pos = inner.pending.iter().position(|e| e.id == guard.id);
89            if let Some(idx) = pos {
90                let front_is_write = inner
91                    .pending
92                    .first()
93                    .map(|e| e.kind == OpKind::Write)
94                    .unwrap_or(false);
95                let is_read = guard.kind == OpKind::Read;
96                if idx == 0 && (!front_is_write || !is_read) {
97                    drop(inner);
98                    let mut inner = self.inner.write();
99                    if let Some(entry) = inner
100                        .pending
101                        .iter()
102                        .position(|e| e.id == guard.id)
103                        .map(|i| inner.pending.remove(i))
104                    {
105                        inner.running.push(entry);
106                    }
107                    return;
108                }
109            }
110            drop(inner);
111            std::thread::sleep(Duration::from_millis(10));
112        }
113    }
114
115    pub fn snapshot(&self) -> OpsSnapshot {
116        let inner = self.inner.read();
117        OpsSnapshot {
118            running: inner
119                .running
120                .iter()
121                .map(|e| OpSnapshotEntry {
122                    id: e.id.clone(),
123                    kind: e.kind,
124                    elapsed_ms: e.started.elapsed().as_millis() as u64,
125                    description: e.description.clone(),
126                })
127                .collect(),
128            pending: inner.pending.len() as u64,
129        }
130    }
131}
132
133pub struct OpGuard {
134    queue: Arc<RwLock<OpQueueInner>>,
135    pub id: String,
136    pub kind: OpKind,
137}
138
139impl Drop for OpGuard {
140    fn drop(&mut self) {
141        let mut inner = self.queue.write();
142        inner.running.retain(|e| e.id != self.id);
143        promote_pending(&mut inner);
144    }
145}
146
147fn promote_pending(inner: &mut OpQueueInner) {
148    while let Some(entry) = inner.pending.first() {
149        let has_write = inner.running.iter().any(|e| e.kind == OpKind::Write);
150        let can_advance = match entry.kind {
151            OpKind::Read => !has_write,
152            OpKind::Write => inner.running.is_empty(),
153        };
154        if can_advance {
155            let e = inner.pending.remove(0);
156            inner.running.push(e);
157        } else {
158            break;
159        }
160    }
161}
162
163#[derive(serde::Serialize)]
164pub struct OpsSnapshot {
165    pub running: Vec<OpSnapshotEntry>,
166    pub pending: u64,
167}
168
169#[derive(serde::Serialize)]
170pub struct OpSnapshotEntry {
171    pub id: String,
172    pub kind: OpKind,
173    pub elapsed_ms: u64,
174    pub description: String,
175}
176
177impl std::fmt::Display for OpKind {
178    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
179        match self {
180            OpKind::Read => write!(f, "read"),
181            OpKind::Write => write!(f, "write"),
182        }
183    }
184}
185
186#[derive(Default)]
187pub struct RepoOpQueues {
188    pub repos: RwLock<HashMap<PathBuf, OpQueue>>,
189}
190
191impl RepoOpQueues {
192    pub fn new() -> Self {
193        Self::default()
194    }
195
196    pub fn get_or_create(&self, repo: &PathBuf) -> OpQueue {
197        let map = self.repos.read();
198        if let Some(q) = map.get(repo) {
199            return q.clone();
200        }
201        drop(map);
202        let mut map = self.repos.write();
203        map.entry(repo.clone()).or_default().clone()
204    }
205}
206
207thread_local! {
208    pub static CURRENT_TRACE_ID: std::cell::RefCell<String> = const { std::cell::RefCell::new(String::new()) };
209}
210
211pub fn generate_trace_id() -> String {
212    uuid::Uuid::new_v4().to_string()
213}
214
215pub fn set_trace_id(trace_id: &str) {
216    CURRENT_TRACE_ID.with(|cell| {
217        *cell.borrow_mut() = trace_id.to_string();
218    });
219}
220
221pub fn get_trace_id() -> String {
222    CURRENT_TRACE_ID.with(|cell| cell.borrow().clone())
223}
224
225pub fn traced_info(msg: impl std::fmt::Display) {
226    let tid = get_trace_id();
227    if tid.is_empty() {
228        info!("{}", msg);
229    } else {
230        info!("[{}] {}", tid, msg);
231    }
232}
233
234pub fn traced_warn(msg: impl std::fmt::Display) {
235    let tid = get_trace_id();
236    if tid.is_empty() {
237        warn!("{}", msg);
238    } else {
239        warn!("[{}] {}", tid, msg);
240    }
241}