1use parking_lot::RwLock;
2use std::collections::HashMap;
3use std::path::PathBuf;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6use tracing::{info, warn};
7
8#[derive(Clone, Copy, PartialEq, Eq, Hash, serde::Serialize)]
9pub enum OpKind {
10 Read,
11 Write,
12}
13
14#[derive(Clone)]
15pub struct OpQueue {
16 inner: Arc<RwLock<OpQueueInner>>,
17}
18
19struct OpQueueInner {
20 running: Vec<OpEntry>,
21 pending: Vec<OpEntry>,
22}
23
24#[derive(Clone)]
25struct OpEntry {
26 id: String,
27 kind: OpKind,
28 started: Instant,
29 description: String,
30}
31
32impl Default for OpQueue {
33 fn default() -> Self {
34 Self::new()
35 }
36}
37
38impl OpQueue {
39 pub fn new() -> Self {
40 Self {
41 inner: Arc::new(RwLock::new(OpQueueInner {
42 running: Vec::new(),
43 pending: Vec::new(),
44 })),
45 }
46 }
47
48 pub fn acquire(&self, kind: OpKind, description: String) -> OpGuard {
49 let id = uuid::Uuid::new_v4().to_string();
50 {
51 let mut inner = self.inner.write();
52 let has_write = inner.running.iter().any(|e| e.kind == OpKind::Write);
53 let has_read_write = if kind == OpKind::Write {
54 !inner.running.is_empty()
55 } else {
56 false
57 };
58 if has_write || has_read_write {
59 inner.pending.push(OpEntry {
60 id: id.clone(),
61 kind,
62 started: Instant::now(),
63 description: description.clone(),
64 });
65 } else {
66 inner.running.push(OpEntry {
67 id: id.clone(),
68 kind,
69 started: Instant::now(),
70 description: description.clone(),
71 });
72 }
73 }
74 OpGuard {
75 queue: self.inner.clone(),
76 id,
77 kind,
78 }
79 }
80
81 pub fn wait_for_turn(&self, guard: &OpGuard) {
82 loop {
83 let inner = self.inner.read();
84 let is_in_running = inner.running.iter().any(|e| e.id == guard.id);
85 if is_in_running {
86 return;
87 }
88 let pos = inner.pending.iter().position(|e| e.id == guard.id);
89 if let Some(idx) = pos {
90 let front_is_write = inner
91 .pending
92 .first()
93 .map(|e| e.kind == OpKind::Write)
94 .unwrap_or(false);
95 let is_read = guard.kind == OpKind::Read;
96 if idx == 0 && (!front_is_write || !is_read) {
97 drop(inner);
98 let mut inner = self.inner.write();
99 if let Some(entry) = inner
100 .pending
101 .iter()
102 .position(|e| e.id == guard.id)
103 .map(|i| inner.pending.remove(i))
104 {
105 inner.running.push(entry);
106 }
107 return;
108 }
109 }
110 drop(inner);
111 std::thread::sleep(Duration::from_millis(10));
112 }
113 }
114
115 pub fn snapshot(&self) -> OpsSnapshot {
116 let inner = self.inner.read();
117 OpsSnapshot {
118 running: inner
119 .running
120 .iter()
121 .map(|e| OpSnapshotEntry {
122 id: e.id.clone(),
123 kind: e.kind,
124 elapsed_ms: e.started.elapsed().as_millis() as u64,
125 description: e.description.clone(),
126 })
127 .collect(),
128 pending: inner.pending.len() as u64,
129 }
130 }
131}
132
133pub struct OpGuard {
134 queue: Arc<RwLock<OpQueueInner>>,
135 pub id: String,
136 pub kind: OpKind,
137}
138
139impl Drop for OpGuard {
140 fn drop(&mut self) {
141 let mut inner = self.queue.write();
142 inner.running.retain(|e| e.id != self.id);
143 promote_pending(&mut inner);
144 }
145}
146
147fn promote_pending(inner: &mut OpQueueInner) {
148 while let Some(entry) = inner.pending.first() {
149 let has_write = inner.running.iter().any(|e| e.kind == OpKind::Write);
150 let can_advance = match entry.kind {
151 OpKind::Read => !has_write,
152 OpKind::Write => inner.running.is_empty(),
153 };
154 if can_advance {
155 let e = inner.pending.remove(0);
156 inner.running.push(e);
157 } else {
158 break;
159 }
160 }
161}
162
163#[derive(serde::Serialize)]
164pub struct OpsSnapshot {
165 pub running: Vec<OpSnapshotEntry>,
166 pub pending: u64,
167}
168
169#[derive(serde::Serialize)]
170pub struct OpSnapshotEntry {
171 pub id: String,
172 pub kind: OpKind,
173 pub elapsed_ms: u64,
174 pub description: String,
175}
176
177impl std::fmt::Display for OpKind {
178 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
179 match self {
180 OpKind::Read => write!(f, "read"),
181 OpKind::Write => write!(f, "write"),
182 }
183 }
184}
185
186#[derive(Default)]
187pub struct RepoOpQueues {
188 pub repos: RwLock<HashMap<PathBuf, OpQueue>>,
189}
190
191impl RepoOpQueues {
192 pub fn new() -> Self {
193 Self::default()
194 }
195
196 pub fn get_or_create(&self, repo: &PathBuf) -> OpQueue {
197 let map = self.repos.read();
198 if let Some(q) = map.get(repo) {
199 return q.clone();
200 }
201 drop(map);
202 let mut map = self.repos.write();
203 map.entry(repo.clone()).or_default().clone()
204 }
205}
206
207thread_local! {
208 pub static CURRENT_TRACE_ID: std::cell::RefCell<String> = const { std::cell::RefCell::new(String::new()) };
209}
210
211pub fn generate_trace_id() -> String {
212 uuid::Uuid::new_v4().to_string()
213}
214
215pub fn set_trace_id(trace_id: &str) {
216 CURRENT_TRACE_ID.with(|cell| {
217 *cell.borrow_mut() = trace_id.to_string();
218 });
219}
220
221pub fn get_trace_id() -> String {
222 CURRENT_TRACE_ID.with(|cell| cell.borrow().clone())
223}
224
225pub fn traced_info(msg: impl std::fmt::Display) {
226 let tid = get_trace_id();
227 if tid.is_empty() {
228 info!("{}", msg);
229 } else {
230 info!("[{}] {}", tid, msg);
231 }
232}
233
234pub fn traced_warn(msg: impl std::fmt::Display) {
235 let tid = get_trace_id();
236 if tid.is_empty() {
237 warn!("{}", msg);
238 } else {
239 warn!("[{}] {}", tid, msg);
240 }
241}