1#![doc = include_str!("../README.md")]
2
3pub mod args;
4pub mod compaction;
5mod topic_file_op;
6
7use async_stream::stream;
8use async_trait::async_trait;
9use bytes::BufMut;
10use bytes::{Buf, BytesMut};
11use chrono::{DateTime, Utc};
12use compaction::{CompactionStrategy, Compactor, ScopedTopicSubscriber, TopicStorageOps};
13use crc::{Crc, CRC_32_ISCSI};
14use log::{error, trace, warn};
15use serde::{Deserialize, Serialize};
16use serde_with::{serde_as, TimestampSecondsWithFrac};
17use std::fs::{self, File, OpenOptions};
18use std::io::{self, Read, Write};
19use std::slice;
20use std::sync::{Arc, Mutex};
21use std::{
22 collections::{hash_map::Entry, HashMap, VecDeque},
23 path::{Path, PathBuf},
24 pin::Pin,
25 time::Duration,
26};
27use streambed::commit_log::{
28 CommitLog, ConsumerOffset, ConsumerRecord, Header, HeaderKey, Offset, PartitionOffsets,
29 ProducedOffset, ProducerError, ProducerRecord, Subscription, Topic,
30};
31use streambed::commit_log::{Key, Partition};
32use tokio::{
33 sync::{mpsc, oneshot},
34 time,
35};
36use tokio_stream::Stream;
37use tokio_util::codec::Decoder;
38use topic_file_op::TopicFileOp;
39
40use crate::topic_file_op::TopicFileOpError;
41
42const COMPACTOR_QUEUE_SIZE: usize = 10;
43const COMPACTOR_WRITE_POLL: Duration = Duration::from_millis(10);
44const CONSUMER_QUEUE_SIZE: usize = 10;
45static CRC: Crc<u32> = Crc::<u32>::new(&CRC_32_ISCSI);
46
47const PRODUCER_QUEUE_SIZE: usize = 10;
48const TOPIC_FILE_CONSUMER_POLL: Duration = Duration::from_secs(1);
49const TOPIC_FILE_PRODUCER_FLUSH: Duration = Duration::from_millis(10);
50
51type ProduceReply = Result<ProducedOffset, ProducerError>;
52type ProduceRequest = (ProducerRecord, oneshot::Sender<ProduceReply>);
53type ShareableTopicMap<T> = Arc<Mutex<HashMap<Topic, T>>>;
54
55#[derive(Clone)]
68pub struct FileLog {
69 compactor_txs: ShareableTopicMap<mpsc::Sender<u64>>,
70 compaction_threshold_size: u64,
71 compaction_write_buffer_size: usize,
72 max_record_size: usize,
73 read_buffer_size: usize,
74 producer_txs: ShareableTopicMap<mpsc::Sender<ProduceRequest>>,
75 pub(crate) topic_file_ops: ShareableTopicMap<TopicFileOp>,
76 root_path: PathBuf,
77 write_buffer_size: usize,
78}
79
80#[derive(Clone, Deserialize, Debug, Eq, PartialEq, Serialize)]
81pub struct StorableHeader {
82 key: HeaderKey,
83 value: Vec<u8>,
84}
85
86#[serde_as]
87#[derive(Clone, Deserialize, Debug, Eq, PartialEq, Serialize)]
88struct StorableRecord {
89 version: u32,
90 headers: Vec<StorableHeader>,
91 #[serde_as(as = "Option<TimestampSecondsWithFrac>")]
92 timestamp: Option<DateTime<Utc>>,
93 key: u64,
94 value: Vec<u8>,
95 offset: u64,
96}
97
98#[derive(Debug)]
100pub struct CompactionRegistrationError;
101
102impl FileLog {
103 pub fn new<P>(root_path: P) -> Self
106 where
107 P: Into<PathBuf>,
108 {
109 Self::with_config(root_path, 64 * 1024, 8192, 64 * 1024, 8 * 1024, 16 * 1024)
110 }
111
112 pub fn with_config<P>(
122 root_path: P,
123 compaction_threshold_size: u64,
124 read_buffer_size: usize,
125 compaction_write_buffer_size: usize,
126 write_buffer_size: usize,
127 max_record_size: usize,
128 ) -> Self
129 where
130 P: Into<PathBuf>,
131 {
132 Self {
133 compactor_txs: Arc::new(Mutex::new(HashMap::new())),
134 compaction_threshold_size,
135 compaction_write_buffer_size,
136 max_record_size,
137 read_buffer_size,
138 root_path: root_path.into(),
139 producer_txs: Arc::new(Mutex::new(HashMap::new())),
140 topic_file_ops: Arc::new(Mutex::new(HashMap::new())),
141 write_buffer_size,
142 }
143 }
144
145 pub fn close_topic(&mut self, topic: &Topic) {
149 if let Ok(mut locked_producer_txs) = self.producer_txs.lock() {
150 locked_producer_txs.remove(topic);
151 }
152 if let Ok(mut locked_topic_file_ops) = self.topic_file_ops.lock() {
153 locked_topic_file_ops.remove(topic);
154 }
155 }
156
157 pub async fn register_compaction<CS>(
164 &mut self,
165 topic: Topic,
166 compaction_strategy: CS,
167 ) -> Result<(), CompactionRegistrationError>
168 where
169 CS: CompactionStrategy + Send + Sync + 'static,
170 {
171 let topic_file_op = {
172 let Ok(mut locked_topic_file_ops) = self.topic_file_ops.lock() else {
173 return Err(CompactionRegistrationError);
174 };
175 acquire_topic_file_ops(&self.root_path, &topic, &mut locked_topic_file_ops)
176 };
177
178 let mut age_active_file_topic_file_op = topic_file_op.clone();
179 let age_active_file_read_buffer_size = self.read_buffer_size;
180 let age_active_file_max_record_size = self.max_record_size;
181 let new_work_file_topic_file_op = topic_file_op.clone();
182 let recover_history_files_topic_file_op = topic_file_op.clone();
183 let replace_history_files_topic_file_op = topic_file_op;
184
185 let compaction_write_buffer_size = self.compaction_write_buffer_size;
186
187 let mut compactor = Compactor::new(
188 compaction_strategy,
189 self.compaction_threshold_size,
190 ScopedTopicSubscriber::new(self.clone(), topic.clone()),
191 TopicStorageOps::new(
192 move || {
193 age_active_file_topic_file_op.age_active_file()?;
194 find_offset(
195 &age_active_file_topic_file_op,
196 age_active_file_read_buffer_size,
197 age_active_file_max_record_size,
198 true,
199 )
200 .map(|o| o.map(|o| o.end_offset))
201 .map_err(TopicFileOpError::IoError)
202 },
203 move || new_work_file_topic_file_op.new_work_file(compaction_write_buffer_size),
204 move || recover_history_files_topic_file_op.recover_history_files(),
205 move || replace_history_files_topic_file_op.replace_history_files(),
206 ),
207 );
208
209 let (compactor_tx, mut compactor_rx) = mpsc::channel::<u64>(COMPACTOR_QUEUE_SIZE);
210
211 tokio::spawn(async move {
212 let mut recv = compactor_rx.recv().await;
213 while let Some(active_file_size) = recv {
214 compactor.step(active_file_size).await;
215 if compactor.is_idle() {
216 recv = compactor_rx.recv().await;
217 } else if let Ok(r) = time::timeout(COMPACTOR_WRITE_POLL, compactor_rx.recv()).await
218 {
219 recv = r;
220 }
221 }
222 });
223
224 if let Ok(mut compactors) = self.compactor_txs.lock() {
225 compactors.insert(topic, compactor_tx);
226 }
227
228 Ok(())
229 }
230
231 pub fn unregister_compaction(&mut self, topic: &Topic) {
233 if let Ok(mut compactors) = self.compactor_txs.lock() {
234 compactors.remove(topic);
235 }
236 }
237}
238
239#[async_trait]
240impl CommitLog for FileLog {
241 async fn offsets(&self, topic: Topic, _partition: Partition) -> Option<PartitionOffsets> {
242 let Ok(mut locked_topic_file_ops) = self.topic_file_ops.lock() else {
243 return None;
244 };
245 let topic_file_op =
246 acquire_topic_file_ops(&self.root_path, &topic, &mut locked_topic_file_ops);
247 drop(locked_topic_file_ops);
248
249 find_offset(
250 &topic_file_op,
251 self.read_buffer_size,
252 self.max_record_size,
253 false,
254 )
255 .ok()
256 .flatten()
257 }
258
259 async fn produce(&self, record: ProducerRecord) -> ProduceReply {
260 let topic_producer = {
261 let Ok(mut locked_producer_map) = self.producer_txs.lock() else {
262 return Err(ProducerError::CannotProduce);
263 };
264 if let Some(topic_producer) = locked_producer_map.get(&record.topic) {
265 let producer_tx = topic_producer.clone();
266 drop(locked_producer_map);
267 producer_tx
268 } else {
269 let (producer_tx, mut producer_rx) =
270 mpsc::channel::<ProduceRequest>(PRODUCER_QUEUE_SIZE);
271 locked_producer_map.insert(record.topic.clone(), producer_tx.clone());
272 drop(locked_producer_map); let Ok(mut locked_topic_file_ops) = self.topic_file_ops.lock() else {
275 return Err(ProducerError::CannotProduce);
276 };
277 let mut topic_file_op = acquire_topic_file_ops(
278 &self.root_path,
279 &record.topic,
280 &mut locked_topic_file_ops,
281 );
282 drop(locked_topic_file_ops);
283
284 let found_offsets = match find_offset(
285 &topic_file_op,
286 self.read_buffer_size,
287 self.max_record_size,
288 false,
289 ) {
290 r @ Ok(_) => r,
291 Err(e) => {
292 error!("Error {e} when producing. Attempting to recover by truncating the active file.");
293
294 if let Err(e) = recover_active_file(
295 &mut topic_file_op,
296 self.read_buffer_size,
297 self.max_record_size,
298 ) {
299 error!("Error {e} when recoverying. Unable to recover the active file.")
300 }
301
302 find_offset(
303 &topic_file_op,
304 self.read_buffer_size,
305 self.max_record_size,
306 false,
307 )
308 }
309 };
310 let mut next_offset = found_offsets
311 .map(|offsets| offsets.map_or(0, |offsets| offsets.end_offset.wrapping_add(1)));
312
313 let task_root_path = self.root_path.clone();
314 let task_compactor_txs = self.compactor_txs.clone();
315 let task_topic_file_ops = self.topic_file_ops.clone();
316 let task_write_buffer_size = self.write_buffer_size;
317
318 let mut open_options = fs::OpenOptions::new();
319 open_options.append(true).create(true);
320
321 let mut file_size = topic_file_op
322 .active_file_size(&open_options, task_write_buffer_size)
323 .unwrap_or_default();
324
325 tokio::spawn({
326 async move {
327 let mut recv = producer_rx.recv().await;
328 while let Some((record, reply_to)) = recv {
329 if let Ok(next_offset) = &mut next_offset {
330 let topic_file_op = {
331 if let Ok(mut locked_topic_file_ops) =
332 task_topic_file_ops.lock()
333 {
334 Some(acquire_topic_file_ops(
335 &task_root_path,
336 &record.topic,
337 &mut locked_topic_file_ops,
338 ))
339 } else {
340 None
341 }
342 };
343 if let Some(mut topic_file_op) = topic_file_op {
344 let r = topic_file_op.with_active_file(
345 &open_options,
346 task_write_buffer_size,
347 |file| {
348 let storable_record = StorableRecord {
349 version: 0,
350 headers: record
351 .headers
352 .into_iter()
353 .map(|h| StorableHeader {
354 key: h.key,
355 value: h.value,
356 })
357 .collect(),
358 timestamp: record.timestamp,
359 key: record.key,
360 value: record.value,
361 offset: *next_offset,
362 };
363
364 trace!("Producing record: {:?}", storable_record);
365
366 if let Ok(buf) = postcard::to_stdvec_crc32(
367 &storable_record,
368 CRC.digest(),
369 ) {
370 file.write_all(&buf)
371 .map_err(TopicFileOpError::IoError)
372 .map(|_| buf.len())
373 } else {
374 Err(TopicFileOpError::CannotSerialize)
375 }
376 },
377 );
378
379 if let Ok((bytes_written, is_new_active_file)) = r {
380 let _ = reply_to.send(Ok(ProducedOffset {
381 offset: *next_offset,
382 }));
383
384 *next_offset = next_offset.wrapping_add(1);
385
386 if is_new_active_file {
387 file_size = 0;
388 }
389 file_size = file_size.wrapping_add(bytes_written as u64);
390
391 let compactor_tx = {
392 if let Ok(locked_task_compactor_txs) =
393 task_compactor_txs.lock()
394 {
395 locked_task_compactor_txs
396 .get(&record.topic)
397 .cloned()
398 } else {
399 None
400 }
401 };
402 if let Some(compactor_tx) = compactor_tx {
403 let _ = compactor_tx.send(file_size).await;
404 }
405
406 match time::timeout(
407 TOPIC_FILE_PRODUCER_FLUSH,
408 producer_rx.recv(),
409 )
410 .await
411 {
412 Ok(r) => recv = r,
413 Err(_) => {
414 let _ = topic_file_op.flush_active_file();
415 recv = producer_rx.recv().await;
416 }
417 }
418
419 continue;
420 }
421 }
422 }
423
424 let _ = reply_to.send(Err(ProducerError::CannotProduce));
425 recv = producer_rx.recv().await;
426 }
427 }
428 });
429
430 producer_tx
431 }
432 };
433
434 let (reply_tx, reply_rx) = oneshot::channel();
435 if topic_producer.send((record, reply_tx)).await.is_ok() {
436 if let Ok(reply) = reply_rx.await {
437 reply
438 } else {
439 Err(ProducerError::CannotProduce)
440 }
441 } else {
442 Err(ProducerError::CannotProduce)
443 }
444 }
445
446 fn scoped_subscribe<'a>(
447 &'a self,
448 _consumer_group_name: &str,
449 offsets: Vec<ConsumerOffset>,
450 subscriptions: Vec<Subscription>,
451 idle_timeout: Option<Duration>,
452 ) -> Pin<Box<dyn Stream<Item = ConsumerRecord> + Send + 'a>> {
453 let offsets = offsets
454 .iter()
455 .map(|e| {
456 assert_eq!(e.partition, 0);
457 (e.topic.to_owned(), e.offset)
458 })
459 .collect::<HashMap<Topic, u64>>();
460
461 let (tx, mut rx) = mpsc::channel(CONSUMER_QUEUE_SIZE);
462
463 let mut open_options = OpenOptions::new();
464 open_options.read(true);
465
466 for s in subscriptions {
467 let task_root_path = self.root_path.clone();
468 let task_topic = s.topic.clone();
469 let mut task_offset = offsets.get(&s.topic).copied();
470 let task_tx = tx.clone();
471 let task_read_buffer_size = self.read_buffer_size;
472 let task_max_record_size = self.max_record_size;
473 let task_topic_file_ops = self.topic_file_ops.clone();
474 let task_open_options = open_options.clone();
475 tokio::spawn(async move {
476 let mut buf = BytesMut::with_capacity(task_read_buffer_size);
477 let mut decoder = StorableRecordDecoder::new(task_max_record_size);
478 let mut last_modification_time = None;
479 'outer: loop {
480 buf.clear();
481
482 let topic_file_op = {
483 let Ok(mut locked_topic_file_ops) = task_topic_file_ops.lock() else {
484 break;
485 };
486 let topic_file_op = acquire_topic_file_ops(
487 &task_root_path,
488 &task_topic,
489 &mut locked_topic_file_ops,
490 );
491 drop(locked_topic_file_ops);
492 topic_file_op
493 };
494
495 let modification_time = topic_file_op.modification_time();
496 if modification_time <= last_modification_time {
497 if task_tx.is_closed() {
498 break;
499 }
500 time::sleep(TOPIC_FILE_CONSUMER_POLL).await;
501 continue;
502 } else {
503 last_modification_time = modification_time;
504 }
505
506 let mut topic_files = topic_file_op
507 .open_files(task_open_options.clone(), false)
508 .into_iter();
509 match topic_files.next() {
510 Some(Ok(mut topic_file)) => loop {
511 let Ok(len) = read_buf(&mut topic_file, &mut buf) else {
512 break;
513 };
514
515 let decode_fn = if len == 0 {
516 StorableRecordDecoder::decode_eof
517 } else {
518 StorableRecordDecoder::decode
519 };
520 let mut r = decode_fn(&mut decoder, &mut buf);
521 while let Ok(Some(record)) = r {
522 if task_offset.is_none() || Some(record.offset) > task_offset {
523 let consumer_record = ConsumerRecord {
524 topic: task_topic.clone(),
525 headers: record
526 .headers
527 .into_iter()
528 .map(|h| Header {
529 key: h.key,
530 value: h.value,
531 })
532 .collect(),
533 timestamp: record.timestamp,
534 key: record.key,
535 value: record.value,
536 partition: 0,
537 offset: record.offset,
538 };
539
540 trace!("Consumed record: {:?}", consumer_record);
541
542 if task_tx.send(consumer_record).await.is_err() {
543 break 'outer;
544 }
545
546 task_offset = Some(record.offset)
547 }
548
549 r = decode_fn(&mut decoder, &mut buf);
550 }
551 match r {
552 Ok(Some(_)) => (), Ok(None) if len == 0 => match topic_files.next() {
554 Some(Ok(tf)) => topic_file = tf,
555 Some(Err(e)) => {
556 warn!("Error consuming topic file: {e} - aborting subscription for {task_topic}");
557 break 'outer;
558 }
559 None => {
560 if task_tx.is_closed() {
561 break 'outer;
562 }
563 time::sleep(TOPIC_FILE_CONSUMER_POLL).await;
564 continue 'outer;
565 }
566 },
567 Ok(None) => (),
568 Err(e) => {
569 if task_tx.is_closed() {
570 break 'outer;
571 }
572 trace!("Topic is corrupt for {topic_file:?}. Error {e} occurred when subscribed. Retrying.");
573 time::sleep(TOPIC_FILE_CONSUMER_POLL).await;
574 continue 'outer;
575 }
576 }
577 },
578 Some(Err(e)) if e.kind() == std::io::ErrorKind::NotFound => {
579 if task_tx.is_closed() {
580 break;
581 }
582 time::sleep(TOPIC_FILE_CONSUMER_POLL).await
583 }
584 Some(Err(e)) => {
585 warn!("Error reading topic file: {e} - aborting subscription");
586 }
587 None => {
588 if task_tx.is_closed() {
589 break;
590 }
591 time::sleep(TOPIC_FILE_CONSUMER_POLL).await
592 }
593 }
594 }
595 });
596 }
597
598 Box::pin(stream!({
599 if let Some(it) = idle_timeout {
600 while let Some(record) = time::timeout(it, rx.recv()).await.ok().flatten() {
601 yield record;
602 }
603 } else {
604 while let Some(record) = rx.recv().await {
605 yield record;
606 }
607 }
608 trace!("Ending subscriptions");
609 }))
610 }
611}
612
613fn acquire_topic_file_ops(
614 root_path: &Path,
615 topic: &Topic,
616 topic_file_ops: &mut HashMap<Topic, TopicFileOp>,
617) -> TopicFileOp {
618 if let Some(topic_file_op) = topic_file_ops.get(topic) {
619 topic_file_op.clone()
620 } else {
621 let topic = topic.clone();
622 let topic_file_op = TopicFileOp::new(root_path.to_path_buf(), topic.clone());
623 topic_file_ops.insert(topic, topic_file_op.clone());
624 topic_file_op
625 }
626}
627
628fn find_offset(
629 topic_file_op: &TopicFileOp,
630 read_buffer_size: usize,
631 max_record_size: usize,
632 exclude_active_file: bool,
633) -> io::Result<Option<PartitionOffsets>> {
634 let mut open_options = OpenOptions::new();
635 open_options.read(true);
636 let mut topic_files = topic_file_op
637 .open_files(open_options, exclude_active_file)
638 .into_iter();
639 match topic_files.next() {
640 Some(Ok(mut topic_file)) => {
641 let mut buf = BytesMut::with_capacity(read_buffer_size);
642 let mut decoder = StorableRecordDecoder::new(max_record_size);
643 let mut beginning_offset = None;
644 let mut end_offset = None;
645 loop {
646 let Ok(len) = read_buf(&mut topic_file, &mut buf) else {
647 break;
648 };
649
650 let decode_fn = if len == 0 {
651 StorableRecordDecoder::decode_eof
652 } else {
653 StorableRecordDecoder::decode
654 };
655 while let Some(record) = decode_fn(&mut decoder, &mut buf)? {
656 if beginning_offset.is_none() {
657 beginning_offset = Some(record.offset);
658 end_offset = Some(record.offset);
659 } else {
660 end_offset = Some(record.offset);
661 }
662 }
663 if len == 0 {
664 match topic_files.next() {
665 Some(Ok(tf)) => topic_file = tf,
666 Some(Err(e)) => return Err(e),
667 None => break,
668 }
669 }
670 }
671 Ok(Some(PartitionOffsets {
672 beginning_offset: beginning_offset.unwrap_or(0),
673 end_offset: end_offset.unwrap_or(0),
674 }))
675 }
676 Some(Err(e)) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
677 Some(Err(e)) => Err(e),
678 None => Ok(None),
679 }
680}
681
682fn recover_active_file(
683 topic_file_op: &mut TopicFileOp,
684 read_buffer_size: usize,
685 max_record_size: usize,
686) -> Result<(), TopicFileOpError> {
687 let mut open_options = OpenOptions::new();
688 open_options.read(true).write(true);
689 let mut topic_file = topic_file_op.open_active_file(open_options)?;
690 let mut buf = BytesMut::with_capacity(read_buffer_size);
691 let mut decoder = StorableRecordDecoder::new(max_record_size);
692 let mut bytes_read = None;
693 loop {
694 let Ok(len) = read_buf(&mut topic_file, &mut buf) else {
695 break;
696 };
697
698 let before_decode_len = buf.len();
699
700 let decode_fn = if len == 0 {
701 StorableRecordDecoder::decode_eof
702 } else {
703 StorableRecordDecoder::decode
704 };
705 let mut r = decode_fn(&mut decoder, &mut buf);
706 while let Ok(Some(_)) = r {
707 r = decode_fn(&mut decoder, &mut buf);
708 }
709 match r {
710 Ok(None) if len == 0 => break,
711 Ok(_) => (),
712 Err(_) => {
713 if let Some(bytes_read) = bytes_read {
714 topic_file
715 .set_len(bytes_read)
716 .map_err(TopicFileOpError::IoError)?;
717 }
718 break;
719 }
720 }
721
722 let consumed_bytes = (before_decode_len - buf.len()) as u64;
723 bytes_read =
724 bytes_read.map_or_else(|| Some(consumed_bytes), |br| br.checked_add(consumed_bytes));
725 }
726 Ok(())
727}
728
729fn read_buf<B>(file: &mut File, buf: &mut B) -> io::Result<usize>
732where
733 B: BufMut,
734{
735 let chunk = buf.chunk_mut();
736 let len = chunk.len();
737 let ptr = chunk.as_mut_ptr();
738 let unused_buf = unsafe { slice::from_raw_parts_mut(ptr, len) };
739 let result = file.read(unused_buf);
740 if let Ok(len) = result {
741 unsafe {
742 buf.advance_mut(len);
743 }
744 }
745 result
746}
747
748struct StorableRecordDecoder {
749 max_record_size: usize,
750}
751
752impl StorableRecordDecoder {
753 pub fn new(max_record_size: usize) -> Self {
754 Self { max_record_size }
755 }
756}
757
758impl Decoder for StorableRecordDecoder {
759 type Item = StorableRecord;
760
761 type Error = std::io::Error;
762
763 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
764 let result = postcard::take_from_bytes_crc32::<StorableRecord>(src, CRC.digest());
765 match result {
766 Ok((record, remaining)) => {
767 src.advance(src.len() - remaining.len());
768 Ok(Some(record))
769 }
770 Err(e)
771 if e == postcard::Error::DeserializeUnexpectedEnd
772 && src.len() <= self.max_record_size =>
773 {
774 Ok(None)
775 }
776 Err(e) => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
777 }
778 }
779}
780
781#[cfg(test)]
782mod tests {
783 use std::{env, sync::Arc};
784
785 use test_log::test;
786 use tokio::{fs, sync::Notify};
787 use tokio_stream::StreamExt;
788
789 use super::*;
790
791 #[test(tokio::test)]
792 async fn test_produce_consume() {
793 let logged_dir = env::temp_dir().join("test_produce_consume");
794 let logged_dir = logged_dir.to_string_lossy().to_string();
796 let _ = fs::remove_dir_all(&logged_dir).await;
797 let _ = fs::create_dir_all(&logged_dir).await;
798 println!("Writing to {logged_dir}");
799
800 let cl = FileLog::new(logged_dir);
801 let task_cl = cl.clone();
802
803 let topic = Topic::from("my-topic");
804
805 assert!(cl.offsets(topic.clone(), 0).await.is_none());
806
807 let task_topic = topic.clone();
808 tokio::spawn(async move {
809 task_cl
810 .produce(ProducerRecord {
811 topic: task_topic.clone(),
812 headers: vec![],
813 timestamp: None,
814 key: 0,
815 value: b"some-value-0".to_vec(),
816 partition: 0,
817 })
818 .await
819 .unwrap();
820 task_cl
821 .produce(ProducerRecord {
822 topic: task_topic.clone(),
823 headers: vec![],
824 timestamp: None,
825 key: 0,
826 value: b"some-value-1".to_vec(),
827 partition: 0,
828 })
829 .await
830 .unwrap();
831 task_cl
832 .produce(ProducerRecord {
833 topic: task_topic.clone(),
834 headers: vec![],
835 timestamp: None,
836 key: 0,
837 value: b"some-value-2".to_vec(),
838 partition: 0,
839 })
840 .await
841 .unwrap();
842
843 time::sleep(TOPIC_FILE_PRODUCER_FLUSH * 2).await;
844 let offsets = task_cl.offsets(task_topic, 0).await.unwrap();
845 assert_eq!(
846 offsets,
847 PartitionOffsets {
848 beginning_offset: 0,
849 end_offset: 2
850 }
851 );
852 });
853
854 let offsets = vec![ConsumerOffset {
855 topic: topic.clone(),
856 partition: 0,
857 offset: 1,
858 }];
859 let subscriptions = vec![Subscription {
860 topic: topic.clone(),
861 }];
862 let mut records = cl.scoped_subscribe("some-consumer", offsets, subscriptions, None);
863
864 assert_eq!(
865 records.next().await,
866 Some(ConsumerRecord {
867 topic,
868 headers: vec![],
869 timestamp: None,
870 key: 0,
871 value: b"some-value-2".to_vec(),
872 partition: 0,
873 offset: 2
874 })
875 );
876 }
877
878 #[test(tokio::test)]
879 async fn test_produce_consume_with_split() {
880 let logged_dir = env::temp_dir().join("test_produce_consume_with_split");
881 let _ = fs::remove_dir_all(&logged_dir).await;
883 let _ = fs::create_dir_all(&logged_dir).await;
884 println!("Writing to {}", logged_dir.to_string_lossy());
885
886 let mut cl = FileLog::new(logged_dir.clone());
887 let mut task_cl = cl.clone();
888
889 let topic = Topic::from("my-topic");
890
891 cl.register_compaction(topic.clone(), compaction::KeyBasedRetention::new(1))
892 .await
893 .unwrap();
894
895 assert!(cl.offsets(topic.clone(), 0).await.is_none());
896
897 let task_topic = topic.clone();
898 tokio::spawn(async move {
899 task_cl
900 .produce(ProducerRecord {
901 topic: task_topic.clone(),
902 headers: vec![],
903 timestamp: None,
904 key: 0,
905 value: b"some-value-0".to_vec(),
906 partition: 0,
907 })
908 .await
909 .unwrap();
910 task_cl
911 .produce(ProducerRecord {
912 topic: task_topic.clone(),
913 headers: vec![],
914 timestamp: None,
915 key: 0,
916 value: b"some-value-1".to_vec(),
917 partition: 0,
918 })
919 .await
920 .unwrap();
921
922 let mut topic_file_op = {
927 let locked_topic_file_ops = task_cl.topic_file_ops.lock().unwrap();
928 locked_topic_file_ops.get(&task_topic).unwrap().clone()
929 };
930 topic_file_op.age_active_file().unwrap();
931
932 task_cl
933 .produce(ProducerRecord {
934 topic: task_topic.clone(),
935 headers: vec![],
936 timestamp: None,
937 key: 0,
938 value: b"some-value-2".to_vec(),
939 partition: 0,
940 })
941 .await
942 .unwrap();
943
944 time::sleep(TOPIC_FILE_PRODUCER_FLUSH * 2).await;
945 let offsets = task_cl.offsets(task_topic.clone(), 0).await.unwrap();
946 assert_eq!(
947 offsets,
948 PartitionOffsets {
949 beginning_offset: 0,
950 end_offset: 2
951 }
952 );
953
954 let topic_file_path = logged_dir.join(task_topic.as_str());
955 assert!(topic_file_path.exists());
956 assert!(topic_file_path
957 .with_extension(topic_file_op::HISTORY_FILE_EXTENSION)
958 .exists());
959
960 task_cl.close_topic(&task_topic);
961 });
962
963 let offsets = vec![ConsumerOffset {
964 topic: topic.clone(),
965 partition: 0,
966 offset: 1,
967 }];
968 let subscriptions = vec![Subscription {
969 topic: topic.clone(),
970 }];
971 let mut records = cl.scoped_subscribe("some-consumer", offsets, subscriptions, None);
972
973 assert_eq!(
974 records.next().await,
975 Some(ConsumerRecord {
976 topic,
977 headers: vec![],
978 timestamp: None,
979 key: 0,
980 value: b"some-value-2".to_vec(),
981 partition: 0,
982 offset: 2
983 })
984 );
985 }
986
987 #[test(tokio::test)]
988 async fn test_consume_wait_for_append() {
989 let logged_dir = env::temp_dir().join("test_consume_wait_for_append");
990 let _ = fs::remove_dir_all(&logged_dir).await;
991 let _ = fs::create_dir_all(&logged_dir).await;
992 println!("Writing to {}", logged_dir.to_string_lossy());
993
994 let cl = FileLog::new(logged_dir);
995 let task_cl = cl.clone();
996
997 let topic = Topic::from("my-topic");
998
999 let subscribing = Arc::new(Notify::new());
1000 let task_subscribing = subscribing.clone();
1001
1002 let produced = Arc::new(Notify::new());
1003 let task_produced = produced.clone();
1004
1005 let task_topic = topic.clone();
1006 tokio::spawn(async move {
1007 let subscriptions = vec![Subscription { topic: task_topic }];
1008 let mut records =
1009 task_cl.scoped_subscribe("some-consumer", vec![], subscriptions, None);
1010 task_subscribing.notify_one();
1011
1012 while records.next().await.is_some() {
1013 task_produced.notify_one();
1014 }
1015 });
1016
1017 subscribing.notified().await;
1018 time::sleep(TOPIC_FILE_CONSUMER_POLL + Duration::from_millis(500)).await;
1019
1020 cl.produce(ProducerRecord {
1021 topic: topic.clone(),
1022 headers: vec![],
1023 timestamp: None,
1024 key: 0,
1025 value: b"some-value-0".to_vec(),
1026 partition: 0,
1027 })
1028 .await
1029 .unwrap();
1030
1031 produced.notified().await;
1032 }
1033
1034 #[test(tokio::test)]
1035 async fn test_consume_with_idle() {
1036 let logged_dir = env::temp_dir().join("test_consume_with_idle");
1037 let _ = fs::remove_dir_all(&logged_dir).await;
1038 let _ = fs::create_dir_all(&logged_dir).await;
1039
1040 let cl = FileLog::new(logged_dir);
1041
1042 let topic = Topic::from("my-topic");
1043
1044 let offsets = vec![ConsumerOffset {
1045 topic: topic.clone(),
1046 partition: 0,
1047 offset: 1,
1048 }];
1049 let subscriptions = vec![Subscription {
1050 topic: topic.clone(),
1051 }];
1052 let mut records = cl.scoped_subscribe(
1053 "some-consumer",
1054 offsets,
1055 subscriptions,
1056 Some(Duration::from_millis(100)),
1057 );
1058 assert!(records.next().await.is_none());
1059
1060 cl.produce(ProducerRecord {
1061 topic: topic.clone(),
1062 headers: vec![],
1063 timestamp: None,
1064 key: 0,
1065 value: b"some-value-0".to_vec(),
1066 partition: 0,
1067 })
1068 .await
1069 .unwrap();
1070
1071 let subscriptions = vec![Subscription { topic }];
1072 let mut records = cl.scoped_subscribe(
1073 "some-consumer",
1074 vec![],
1075 subscriptions,
1076 Some(TOPIC_FILE_CONSUMER_POLL + Duration::from_millis(500)),
1077 );
1078 assert!(records.next().await.is_some());
1079 assert!(records.next().await.is_none());
1080 }
1081
1082 #[test(tokio::test)]
1083 async fn test_recovery() {
1084 let logged_dir = env::temp_dir().join("test_recovery");
1085 let _ = fs::remove_dir_all(&logged_dir).await;
1086 let _ = fs::create_dir_all(&logged_dir).await;
1087 println!("Writing to {logged_dir:?}");
1088
1089 let cl = FileLog::new(logged_dir.clone());
1090
1091 let topic = Topic::from("my-topic");
1092
1093 cl.produce(ProducerRecord {
1094 topic: topic.clone(),
1095 headers: vec![],
1096 timestamp: None,
1097 key: 0,
1098 value: b"some-value-0".to_vec(),
1099 partition: 0,
1100 })
1101 .await
1102 .unwrap();
1103 cl.produce(ProducerRecord {
1104 topic: topic.clone(),
1105 headers: vec![],
1106 timestamp: None,
1107 key: 0,
1108 value: b"some-value-1".to_vec(),
1109 partition: 0,
1110 })
1111 .await
1112 .unwrap();
1113 cl.produce(ProducerRecord {
1114 topic: topic.clone(),
1115 headers: vec![],
1116 timestamp: None,
1117 key: 0,
1118 value: b"some-value-2".to_vec(),
1119 partition: 0,
1120 })
1121 .await
1122 .unwrap();
1123
1124 drop(cl);
1126
1127 let topic_file_path = logged_dir.join(topic.as_str());
1130 let topic_file = fs::OpenOptions::new()
1131 .write(true)
1132 .open(topic_file_path)
1133 .await
1134 .unwrap();
1135
1136 let len = topic_file.metadata().await.unwrap().len();
1137 topic_file.set_len(len - 2).await.unwrap();
1138
1139 let cl = FileLog::new(logged_dir.clone());
1142
1143 cl.produce(ProducerRecord {
1144 topic: topic.clone(),
1145 headers: vec![],
1146 timestamp: None,
1147 key: 0,
1148 value: b"some-value-3".to_vec(),
1149 partition: 0,
1150 })
1151 .await
1152 .unwrap();
1153
1154 let offsets = vec![ConsumerOffset {
1155 topic: topic.clone(),
1156 partition: 0,
1157 offset: 0,
1158 }];
1159 let subscriptions = vec![Subscription {
1160 topic: topic.clone(),
1161 }];
1162 let mut records = cl.scoped_subscribe("some-consumer", offsets, subscriptions, None);
1163
1164 assert_eq!(
1165 records.next().await,
1166 Some(ConsumerRecord {
1167 topic: topic.clone(),
1168 headers: vec![],
1169 timestamp: None,
1170 key: 0,
1171 value: b"some-value-1".to_vec(),
1172 partition: 0,
1173 offset: 1
1174 })
1175 );
1176
1177 assert_eq!(
1178 records.next().await,
1179 Some(ConsumerRecord {
1180 topic,
1181 headers: vec![],
1182 timestamp: None,
1183 key: 0,
1184 value: b"some-value-3".to_vec(),
1185 partition: 0,
1186 offset: 2
1187 })
1188 );
1189 }
1190}