1use std::collections::HashMap;
4use std::fs;
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7use std::time::{SystemTime, UNIX_EPOCH};
8
9use parking_lot::RwLock;
10
11use crate::cycle::{FastDaily, RollCycle};
12use crate::error::{Error, Result};
13use crate::reader::Reader;
14use crate::store::{Store, StoreConfig};
15use crate::writer::Writer;
16
17pub const FILE_EXTENSION: &str = "cq4";
19
20pub struct QueueBuilder {
34 path: PathBuf,
35 roll_cycle: Arc<dyn RollCycle>,
36 block_size: usize,
37 index_spacing: u32,
38 index_count: u32,
39 epoch: u64,
40 read_only: bool,
41 enable_checksums: bool,
42}
43
44impl QueueBuilder {
45 pub fn new(path: impl Into<PathBuf>) -> Self {
47 Self {
48 path: path.into(),
49 roll_cycle: Arc::new(FastDaily),
50 block_size: 64 * 1024 * 1024,
51 index_spacing: 256,
52 index_count: 4096,
53 epoch: 0,
54 read_only: false,
55 enable_checksums: false,
56 }
57 }
58
59 pub fn roll_cycle<R: RollCycle + 'static>(mut self, cycle: R) -> Self {
61 self.roll_cycle = Arc::new(cycle);
62 self
63 }
64
65 pub fn block_size(mut self, size: usize) -> Self {
67 self.block_size = size;
68 self
69 }
70
71 pub fn index_spacing(mut self, spacing: u32) -> Self {
74 self.index_spacing = spacing.next_power_of_two();
75 self
76 }
77
78 pub fn index_count(mut self, count: u32) -> Self {
81 self.index_count = count.next_power_of_two();
82 self
83 }
84
85 pub fn epoch(mut self, epoch: u64) -> Self {
87 self.epoch = epoch;
88 self
89 }
90
91 pub fn read_only(mut self, read_only: bool) -> Self {
93 self.read_only = read_only;
94 self
95 }
96
97 pub fn checksums(mut self, enable: bool) -> Self {
103 self.enable_checksums = enable;
104 self
105 }
106
107 pub fn build(self) -> Result<Queue> {
109 Queue::open(self)
110 }
111}
112
113pub struct Queue {
135 path: PathBuf,
136 roll_cycle: Arc<dyn RollCycle>,
137 block_size: usize,
138 index_spacing: u32,
139 index_count: u32,
140 epoch: u64,
141 read_only: bool,
142 enable_checksums: bool,
143 store_cache: RwLock<HashMap<i32, Arc<Store>>>,
144 directory_listing: RwLock<Vec<i32>>,
145}
146
147impl Queue {
148 #[must_use]
150 pub fn new(path: impl Into<PathBuf>) -> QueueBuilder {
151 QueueBuilder::new(path)
152 }
153
154 fn open(builder: QueueBuilder) -> Result<Self> {
156 if !builder.path.exists() {
157 if builder.read_only {
158 return Err(Error::StoreFileMissing(builder.path));
159 }
160 fs::create_dir_all(&builder.path)
161 .map_err(|_| Error::DirectoryCreationFailed(builder.path.clone()))?;
162 }
163
164 let queue = Self {
165 path: builder.path,
166 roll_cycle: builder.roll_cycle,
167 block_size: builder.block_size,
168 index_spacing: builder.index_spacing,
169 index_count: builder.index_count,
170 epoch: builder.epoch,
171 read_only: builder.read_only,
172 enable_checksums: builder.enable_checksums,
173 store_cache: RwLock::new(HashMap::new()),
174 directory_listing: RwLock::new(Vec::new()),
175 };
176
177 queue.refresh_directory_listing()?;
178 Ok(queue)
179 }
180
181 pub fn writer(&self) -> Result<Writer<'_>> {
187 if self.read_only {
188 return Err(Error::ReadOnly);
189 }
190 Writer::new(self)
191 }
192
193 pub fn reader(&self) -> Result<Reader<'_>> {
195 Reader::new(self)
196 }
197
198 #[must_use]
200 pub fn path(&self) -> &Path {
201 &self.path
202 }
203
204 #[must_use]
206 pub fn roll_cycle(&self) -> &dyn RollCycle {
207 self.roll_cycle.as_ref()
208 }
209
210 #[must_use]
212 pub fn block_size(&self) -> usize {
213 self.block_size
214 }
215
216 #[must_use]
218 pub fn epoch(&self) -> u64 {
219 self.epoch
220 }
221
222 #[must_use]
224 pub fn is_read_only(&self) -> bool {
225 self.read_only
226 }
227
228 #[must_use]
230 pub fn checksums_enabled(&self) -> bool {
231 self.enable_checksums
232 }
233
234 #[must_use]
236 pub fn index_spacing(&self) -> u32 {
237 self.index_spacing
238 }
239
240 #[must_use]
242 pub fn index_count(&self) -> u32 {
243 self.index_count
244 }
245
246 #[must_use]
248 pub fn first_cycle(&self) -> Option<i32> {
249 self.directory_listing.read().first().copied()
250 }
251
252 #[must_use]
254 pub fn last_cycle(&self) -> Option<i32> {
255 self.directory_listing.read().last().copied()
256 }
257
258 #[must_use]
260 pub fn current_cycle(&self) -> i32 {
261 let now = SystemTime::now()
262 .duration_since(UNIX_EPOCH)
263 .map(|d| d.as_millis() as u64)
264 .unwrap_or(0);
265 self.roll_cycle.current_cycle(now, self.epoch)
266 }
267
268 pub(crate) fn acquire_store(&self, cycle: i32) -> Result<Arc<Store>> {
270 {
271 let cache = self.store_cache.read();
272 if let Some(store) = cache.get(&cycle) {
273 return Ok(Arc::clone(store));
274 }
275 }
276
277 let store = Arc::new(self.create_store(cycle)?);
278
279 {
280 let mut cache = self.store_cache.write();
281 cache.insert(cycle, Arc::clone(&store));
282 }
283
284 {
285 let mut listing = self.directory_listing.write();
286 if !listing.contains(&cycle) {
287 listing.push(cycle);
288 listing.sort_unstable();
289 }
290 }
291
292 Ok(store)
293 }
294
295 fn create_store(&self, cycle: i32) -> Result<Store> {
297 let filename = format!(
298 "{}.{}",
299 self.roll_cycle.format_cycle(cycle, self.epoch),
300 FILE_EXTENSION
301 );
302 let config = StoreConfig {
303 block_size: self.block_size,
304 enable_checksums: self.enable_checksums,
305 ..StoreConfig::default()
306 };
307 Store::open(self.path.join(filename), cycle, config, self.read_only)
308 }
309
310 #[must_use]
312 pub fn filename_for_cycle(&self, cycle: i32) -> String {
313 format!(
314 "{}.{}",
315 self.roll_cycle.format_cycle(cycle, self.epoch),
316 FILE_EXTENSION
317 )
318 }
319
320 fn refresh_directory_listing(&self) -> Result<()> {
322 let mut cycles = Vec::new();
323
324 if let Ok(entries) = fs::read_dir(&self.path) {
325 for entry in entries.flatten() {
326 let path = entry.path();
327 if path.extension().is_some_and(|ext| ext == FILE_EXTENSION) {
328 if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
329 if let Some(cycle) = self.roll_cycle.parse_cycle(stem, self.epoch) {
330 cycles.push(cycle);
331 }
332 }
333 }
334 }
335 }
336
337 cycles.sort_unstable();
338
339 *self.directory_listing.write() = cycles;
340 Ok(())
341 }
342}
343
344#[cfg(test)]
345mod tests {
346 use super::*;
347 use tempfile::TempDir;
348
349 #[test]
350 fn test_builder_defaults() {
351 let builder = QueueBuilder::new("/tmp/test");
352 assert_eq!(builder.block_size, 64 * 1024 * 1024);
353 assert_eq!(builder.index_spacing, 256);
354 assert_eq!(builder.index_count, 4096);
355 assert_eq!(builder.epoch, 0);
356 assert!(!builder.read_only);
357 assert!(!builder.enable_checksums);
358 }
359
360 #[test]
361 fn test_checksums_enabled() {
362 let temp_dir = TempDir::new().unwrap();
363 let queue = Queue::new(temp_dir.path()).checksums(true).build().unwrap();
364
365 assert!(queue.checksums_enabled());
366 }
367
368 #[test]
369 fn test_creation() {
370 let temp_dir = TempDir::new().unwrap();
371 let queue = Queue::new(temp_dir.path()).build().unwrap();
372
373 assert!(queue.path().exists());
374 assert!(!queue.is_read_only());
375 }
376
377 #[test]
378 fn test_filename_for_cycle() {
379 let temp_dir = TempDir::new().unwrap();
380 let queue = Queue::new(temp_dir.path()).build().unwrap();
381
382 assert_eq!(queue.filename_for_cycle(0), "19700101.cq4");
383 }
384
385 #[test]
386 fn test_single_producer_multiple_consumer() {
387 let temp_dir = TempDir::new().unwrap();
388 let queue = Queue::new(temp_dir.path()).build().unwrap();
389
390 let mut writer = queue.writer().unwrap();
391
392 let messages: Vec<Vec<u8>> = (0..10)
394 .map(|i| format!("message-{}", i).into_bytes())
395 .collect();
396
397 for msg in &messages {
398 writer.write(msg).unwrap();
399 }
400
401 let mut r1 = queue.reader().unwrap();
403 r1.rewind().unwrap();
404
405 let mut r2 = queue.reader().unwrap();
406 r2.rewind().unwrap();
407
408 for expected in &messages {
410 let read1 = r1.read().unwrap();
411 assert!(read1.is_some(), "r1 should have data");
412 assert_eq!(read1.unwrap().as_ref(), expected.as_slice());
413
414 let read2 = r2.read().unwrap();
415 assert!(read2.is_some(), "r2 should have data");
416 assert_eq!(read2.unwrap().as_ref(), expected.as_slice());
417 }
418
419 assert!(r1.read().unwrap().is_none());
421 assert!(r2.read().unwrap().is_none());
422 }
423}