1#![doc = include_str!("../README.md")]
2
3pub mod args;
4pub mod compaction;
5mod topic_file_op;
6
7use async_stream::stream;
8use async_trait::async_trait;
9use bytes::BufMut;
10use bytes::{Buf, BytesMut};
11use chrono::{DateTime, Utc};
12use compaction::{CompactionStrategy, Compactor, ScopedTopicSubscriber, TopicStorageOps};
13use crc::{Crc, CRC_32_ISCSI};
14use log::{error, trace, warn};
15use serde::{Deserialize, Serialize};
16use serde_with::{serde_as, TimestampSecondsWithFrac};
17use std::fs::{self, File, OpenOptions};
18use std::io::{self, Read, Write};
19use std::slice;
20use std::sync::{Arc, Mutex};
21use std::{
22 collections::{hash_map::Entry, HashMap, VecDeque},
23 path::{Path, PathBuf},
24 pin::Pin,
25 time::Duration,
26};
27use streambed::commit_log::{
28 CommitLog, ConsumerOffset, ConsumerRecord, Header, HeaderKey, Offset, PartitionOffsets,
29 ProducedOffset, ProducerError, ProducerRecord, Subscription, Topic,
30};
31use streambed::commit_log::{Key, Partition};
32use tokio::{
33 sync::{mpsc, oneshot},
34 time,
35};
36use tokio_stream::Stream;
37use tokio_util::codec::Decoder;
38use topic_file_op::TopicFileOp;
39
40use crate::topic_file_op::TopicFileOpError;
41
42const COMPACTOR_QUEUE_SIZE: usize = 10;
43const COMPACTOR_WRITE_POLL: Duration = Duration::from_millis(10);
44const CONSUMER_QUEUE_SIZE: usize = 10;
45static CRC: Crc<u32> = Crc::<u32>::new(&CRC_32_ISCSI);
46
47const PRODUCER_QUEUE_SIZE: usize = 10;
48const TOPIC_FILE_CONSUMER_POLL: Duration = Duration::from_secs(1);
49const TOPIC_FILE_PRODUCER_FLUSH: Duration = Duration::from_millis(10);
50
51type ProduceReply = Result<ProducedOffset, ProducerError>;
52type ProduceRequest = (ProducerRecord, oneshot::Sender<ProduceReply>);
53type ShareableTopicMap<T> = Arc<Mutex<HashMap<Topic, T>>>;
54
55#[derive(Clone)]
68pub struct FileLog {
69 compactor_txs: ShareableTopicMap<mpsc::Sender<u64>>,
70 compaction_threshold_size: u64,
71 compaction_write_buffer_size: usize,
72 max_record_size: usize,
73 read_buffer_size: usize,
74 producer_txs: ShareableTopicMap<mpsc::Sender<ProduceRequest>>,
75 pub(crate) topic_file_ops: ShareableTopicMap<TopicFileOp>,
76 root_path: PathBuf,
77 write_buffer_size: usize,
78}
79
80#[derive(Clone, Deserialize, Debug, Eq, PartialEq, Serialize)]
81pub struct StorableHeader {
82 key: HeaderKey,
83 value: Vec<u8>,
84}
85
86#[serde_as]
87#[derive(Clone, Deserialize, Debug, Eq, PartialEq, Serialize)]
88struct StorableRecord {
89 version: u32,
90 headers: Vec<StorableHeader>,
91 #[serde_as(as = "Option<TimestampSecondsWithFrac>")]
92 timestamp: Option<DateTime<Utc>>,
93 key: u64,
94 value: Vec<u8>,
95 offset: u64,
96}
97
98#[derive(Debug)]
100pub struct CompactionRegistrationError;
101
102impl FileLog {
103 pub fn new<P>(root_path: P) -> Self
106 where
107 P: Into<PathBuf>,
108 {
109 Self::with_config(root_path, 64 * 1024, 8192, 64 * 1024, 8 * 1024, 16 * 1024)
110 }
111
112 pub fn with_config<P>(
122 root_path: P,
123 compaction_threshold_size: u64,
124 read_buffer_size: usize,
125 compaction_write_buffer_size: usize,
126 write_buffer_size: usize,
127 max_record_size: usize,
128 ) -> Self
129 where
130 P: Into<PathBuf>,
131 {
132 Self {
133 compactor_txs: Arc::new(Mutex::new(HashMap::new())),
134 compaction_threshold_size,
135 compaction_write_buffer_size,
136 max_record_size,
137 read_buffer_size,
138 root_path: root_path.into(),
139 producer_txs: Arc::new(Mutex::new(HashMap::new())),
140 topic_file_ops: Arc::new(Mutex::new(HashMap::new())),
141 write_buffer_size,
142 }
143 }
144
145 pub fn close_topic(&mut self, topic: &Topic) {
149 if let Ok(mut locked_producer_txs) = self.producer_txs.lock() {
150 locked_producer_txs.remove(topic);
151 }
152 if let Ok(mut locked_topic_file_ops) = self.topic_file_ops.lock() {
153 locked_topic_file_ops.remove(topic);
154 }
155 }
156
157 pub async fn register_compaction<CS>(
164 &mut self,
165 topic: Topic,
166 compaction_strategy: CS,
167 ) -> Result<(), CompactionRegistrationError>
168 where
169 CS: CompactionStrategy + Send + Sync + 'static,
170 {
171 let topic_file_op = {
172 let Ok(mut locked_topic_file_ops) = self.topic_file_ops.lock() else {
173 return Err(CompactionRegistrationError);
174 };
175 acquire_topic_file_ops(&self.root_path, &topic, &mut locked_topic_file_ops)
176 };
177
178 let mut age_active_file_topic_file_op = topic_file_op.clone();
179 let age_active_file_read_buffer_size = self.read_buffer_size;
180 let age_active_file_max_record_size = self.max_record_size;
181 let new_work_file_topic_file_op = topic_file_op.clone();
182 let recover_history_files_topic_file_op = topic_file_op.clone();
183 let replace_history_files_topic_file_op = topic_file_op;
184
185 let compaction_write_buffer_size = self.compaction_write_buffer_size;
186
187 let mut compactor = Compactor::new(
188 compaction_strategy,
189 self.compaction_threshold_size,
190 ScopedTopicSubscriber::new(self.clone(), topic.clone()),
191 TopicStorageOps::new(
192 move || {
193 age_active_file_topic_file_op.age_active_file()?;
194 find_offset(
195 &age_active_file_topic_file_op,
196 age_active_file_read_buffer_size,
197 age_active_file_max_record_size,
198 true,
199 )
200 .map(|o| o.map(|o| o.end_offset))
201 .map_err(TopicFileOpError::IoError)
202 },
203 move || new_work_file_topic_file_op.new_work_file(compaction_write_buffer_size),
204 move || recover_history_files_topic_file_op.recover_history_files(),
205 move || replace_history_files_topic_file_op.replace_history_files(),
206 ),
207 );
208
209 let (compactor_tx, mut compactor_rx) = mpsc::channel::<u64>(COMPACTOR_QUEUE_SIZE);
210
211 tokio::spawn(async move {
212 let mut recv = compactor_rx.recv().await;
213 while let Some(active_file_size) = recv {
214 compactor.step(active_file_size).await;
215 if compactor.is_idle() {
216 recv = compactor_rx.recv().await;
217 } else if let Ok(r) = time::timeout(COMPACTOR_WRITE_POLL, compactor_rx.recv()).await
218 {
219 recv = r;
220 }
221 }
222 });
223
224 if let Ok(mut compactors) = self.compactor_txs.lock() {
225 compactors.insert(topic, compactor_tx);
226 }
227
228 Ok(())
229 }
230
231 pub fn unregister_compaction(&mut self, topic: &Topic) {
233 if let Ok(mut compactors) = self.compactor_txs.lock() {
234 compactors.remove(topic);
235 }
236 }
237}
238
239#[async_trait]
240impl CommitLog for FileLog {
241 async fn offsets(&self, topic: Topic, _partition: Partition) -> Option<PartitionOffsets> {
242 let Ok(mut locked_topic_file_ops) = self.topic_file_ops.lock() else {
243 return None;
244 };
245 let topic_file_op =
246 acquire_topic_file_ops(&self.root_path, &topic, &mut locked_topic_file_ops);
247 drop(locked_topic_file_ops);
248
249 find_offset(
250 &topic_file_op,
251 self.read_buffer_size,
252 self.max_record_size,
253 false,
254 )
255 .ok()
256 .flatten()
257 }
258
259 async fn produce(&self, record: ProducerRecord) -> ProduceReply {
260 let topic_producer = {
261 let Ok(mut locked_producer_map) = self.producer_txs.lock() else {
262 return Err(ProducerError::CannotProduce);
263 };
264 if let Some(topic_producer) = locked_producer_map.get(&record.topic) {
265 let producer_tx = topic_producer.clone();
266 drop(locked_producer_map);
267 producer_tx
268 } else {
269 let (producer_tx, mut producer_rx) =
270 mpsc::channel::<ProduceRequest>(PRODUCER_QUEUE_SIZE);
271 locked_producer_map.insert(record.topic.clone(), producer_tx.clone());
272 drop(locked_producer_map); let Ok(mut locked_topic_file_ops) = self.topic_file_ops.lock() else {
275 return Err(ProducerError::CannotProduce);
276 };
277 let mut topic_file_op = acquire_topic_file_ops(
278 &self.root_path,
279 &record.topic,
280 &mut locked_topic_file_ops,
281 );
282 drop(locked_topic_file_ops);
283
284 let found_offsets = match find_offset(
285 &topic_file_op,
286 self.read_buffer_size,
287 self.max_record_size,
288 false,
289 ) {
290 r @ Ok(_) => r,
291 Err(e) => {
292 error!("Error {e} when producing. Attempting to recover by truncating the active file.");
293
294 if let Err(e) = recover_active_file(
295 &mut topic_file_op,
296 self.read_buffer_size,
297 self.max_record_size,
298 ) {
299 error!("Error {e} when recoverying. Unable to recover the active file.")
300 }
301
302 find_offset(
303 &topic_file_op,
304 self.read_buffer_size,
305 self.max_record_size,
306 false,
307 )
308 }
309 };
310 let mut next_offset = found_offsets
311 .map(|offsets| offsets.map_or(0, |offsets| offsets.end_offset.wrapping_add(1)));
312
313 let task_root_path = self.root_path.clone();
314 let task_compactor_txs = self.compactor_txs.clone();
315 let task_topic_file_ops = self.topic_file_ops.clone();
316 let task_write_buffer_size = self.write_buffer_size;
317
318 let mut open_options = fs::OpenOptions::new();
319 open_options.append(true).create(true);
320
321 let mut file_size = topic_file_op
322 .active_file_size(&open_options, task_write_buffer_size)
323 .unwrap_or_default();
324
325 tokio::spawn({
326 async move {
327 let mut recv = producer_rx.recv().await;
328 while let Some((record, reply_to)) = recv {
329 if let Ok(next_offset) = &mut next_offset {
330 let topic_file_op = {
331 if let Ok(mut locked_topic_file_ops) =
332 task_topic_file_ops.lock()
333 {
334 Some(acquire_topic_file_ops(
335 &task_root_path,
336 &record.topic,
337 &mut locked_topic_file_ops,
338 ))
339 } else {
340 None
341 }
342 };
343 if let Some(mut topic_file_op) = topic_file_op {
344 let r = topic_file_op.with_active_file(
345 &open_options,
346 task_write_buffer_size,
347 |file| {
348 let storable_record = StorableRecord {
349 version: 0,
350 headers: record
351 .headers
352 .into_iter()
353 .map(|h| StorableHeader {
354 key: h.key,
355 value: h.value,
356 })
357 .collect(),
358 timestamp: record.timestamp,
359 key: record.key,
360 value: record.value,
361 offset: *next_offset,
362 };
363
364 trace!("Producing record: {:?}", storable_record);
365
366 if let Ok(buf) = postcard::to_stdvec_crc32(
367 &storable_record,
368 CRC.digest(),
369 ) {
370 file.write_all(&buf)
371 .map_err(TopicFileOpError::IoError)
372 .map(|_| buf.len())
373 } else {
374 Err(TopicFileOpError::CannotSerialize)
375 }
376 },
377 );
378
379 if let Ok((bytes_written, is_new_active_file)) = r {
380 let _ = reply_to.send(Ok(ProducedOffset {
381 offset: *next_offset,
382 }));
383
384 *next_offset = next_offset.wrapping_add(1);
385
386 if is_new_active_file {
387 file_size = 0;
388 }
389 file_size = file_size.wrapping_add(bytes_written as u64);
390
391 let compactor_tx = {
392 if let Ok(locked_task_compactor_txs) =
393 task_compactor_txs.lock()
394 {
395 locked_task_compactor_txs
396 .get(&record.topic)
397 .cloned()
398 } else {
399 None
400 }
401 };
402 if let Some(compactor_tx) = compactor_tx {
403 let _ = compactor_tx.send(file_size).await;
404 }
405
406 match time::timeout(
407 TOPIC_FILE_PRODUCER_FLUSH,
408 producer_rx.recv(),
409 )
410 .await
411 {
412 Ok(r) => recv = r,
413 Err(_) => {
414 let _ = topic_file_op.flush_active_file();
415 recv = producer_rx.recv().await;
416 }
417 }
418
419 continue;
420 }
421 }
422 }
423
424 let _ = reply_to.send(Err(ProducerError::CannotProduce));
425 recv = producer_rx.recv().await;
426 }
427 }
428 });
429
430 producer_tx
431 }
432 };
433
434 let (reply_tx, reply_rx) = oneshot::channel();
435 if topic_producer.send((record, reply_tx)).await.is_ok() {
436 if let Ok(reply) = reply_rx.await {
437 reply
438 } else {
439 Err(ProducerError::CannotProduce)
440 }
441 } else {
442 Err(ProducerError::CannotProduce)
443 }
444 }
445
446 fn scoped_subscribe<'a>(
447 &'a self,
448 _consumer_group_name: &str,
449 offsets: Vec<ConsumerOffset>,
450 subscriptions: Vec<Subscription>,
451 idle_timeout: Option<Duration>,
452 ) -> Pin<Box<dyn Stream<Item = ConsumerRecord> + Send + 'a>> {
453 let offsets = offsets
454 .iter()
455 .map(|e| {
456 assert_eq!(e.partition, 0);
457 (e.topic.to_owned(), e.offset)
458 })
459 .collect::<HashMap<Topic, u64>>();
460
461 let (tx, mut rx) = mpsc::channel(CONSUMER_QUEUE_SIZE);
462
463 let mut open_options = OpenOptions::new();
464 open_options.read(true);
465
466 for s in subscriptions {
467 let task_root_path = self.root_path.clone();
468 let task_topic = s.topic.clone();
469 let mut task_offset = offsets.get(&s.topic).copied();
470 let task_tx = tx.clone();
471 let task_read_buffer_size = self.read_buffer_size;
472 let task_max_record_size = self.max_record_size;
473 let task_topic_file_ops = self.topic_file_ops.clone();
474 let task_open_options = open_options.clone();
475 tokio::spawn(async move {
476 let mut buf = BytesMut::with_capacity(task_read_buffer_size);
477 let mut decoder = StorableRecordDecoder::new(task_max_record_size);
478 'outer: loop {
479 buf.clear();
480
481 let topic_file_op = {
482 let Ok(mut locked_topic_file_ops) = task_topic_file_ops.lock() else {
483 break;
484 };
485 let topic_file_op = acquire_topic_file_ops(
486 &task_root_path,
487 &task_topic,
488 &mut locked_topic_file_ops,
489 );
490 drop(locked_topic_file_ops);
491 topic_file_op
492 };
493
494 let mut topic_files = topic_file_op
495 .open_files(task_open_options.clone(), false)
496 .into_iter();
497 match topic_files.next() {
498 Some(Ok(mut topic_file)) => loop {
499 let Ok(len) = read_buf(&mut topic_file, &mut buf) else {
500 break;
501 };
502
503 let decode_fn = if len == 0 {
504 StorableRecordDecoder::decode_eof
505 } else {
506 StorableRecordDecoder::decode
507 };
508 let mut r = decode_fn(&mut decoder, &mut buf);
509 while let Ok(Some(record)) = r {
510 if task_offset.is_none() || Some(record.offset) > task_offset {
511 let consumer_record = ConsumerRecord {
512 topic: task_topic.clone(),
513 headers: record
514 .headers
515 .into_iter()
516 .map(|h| Header {
517 key: h.key,
518 value: h.value,
519 })
520 .collect(),
521 timestamp: record.timestamp,
522 key: record.key,
523 value: record.value,
524 partition: 0,
525 offset: record.offset,
526 };
527
528 trace!("Consumed record: {:?}", consumer_record);
529
530 if task_tx.send(consumer_record).await.is_err() {
531 break 'outer;
532 }
533
534 task_offset = Some(record.offset)
535 }
536
537 r = decode_fn(&mut decoder, &mut buf);
538 }
539 match r {
540 Ok(Some(_)) => (), Ok(None) if len == 0 => match topic_files.next() {
542 Some(Ok(tf)) => topic_file = tf,
543 Some(Err(e)) => {
544 warn!("Error consuming topic file: {e} - aborting subscription for {task_topic}");
545 break 'outer;
546 }
547 None => {
548 if task_tx.is_closed() {
549 break 'outer;
550 }
551 time::sleep(TOPIC_FILE_CONSUMER_POLL).await;
552 continue 'outer;
553 }
554 },
555 Ok(None) => (),
556 Err(e) => {
557 if task_tx.is_closed() {
558 break 'outer;
559 }
560 trace!("Topic is corrupt for {topic_file:?}. Error {e} occurred when subscribed. Retrying.");
561 time::sleep(TOPIC_FILE_CONSUMER_POLL).await;
562 continue 'outer;
563 }
564 }
565 },
566 Some(Err(e)) if e.kind() == std::io::ErrorKind::NotFound => {
567 if task_tx.is_closed() {
568 break;
569 }
570 time::sleep(TOPIC_FILE_CONSUMER_POLL).await
571 }
572 Some(Err(e)) => {
573 warn!("Error reading topic file: {e} - aborting subscription");
574 }
575 None => {
576 if task_tx.is_closed() {
577 break;
578 }
579 time::sleep(TOPIC_FILE_CONSUMER_POLL).await
580 }
581 }
582 }
583 });
584 }
585
586 Box::pin(stream!({
587 if let Some(it) = idle_timeout {
588 while let Some(record) = time::timeout(it, rx.recv()).await.ok().flatten() {
589 yield record;
590 }
591 } else {
592 while let Some(record) = rx.recv().await {
593 yield record;
594 }
595 }
596 trace!("Ending subscriptions");
597 }))
598 }
599}
600
601fn acquire_topic_file_ops(
602 root_path: &Path,
603 topic: &Topic,
604 topic_file_ops: &mut HashMap<Topic, TopicFileOp>,
605) -> TopicFileOp {
606 if let Some(topic_file_op) = topic_file_ops.get(topic) {
607 topic_file_op.clone()
608 } else {
609 let topic = topic.clone();
610 let topic_file_op = TopicFileOp::new(root_path.to_path_buf(), topic.clone());
611 topic_file_ops.insert(topic, topic_file_op.clone());
612 topic_file_op
613 }
614}
615
616fn find_offset(
617 topic_file_op: &TopicFileOp,
618 read_buffer_size: usize,
619 max_record_size: usize,
620 exclude_active_file: bool,
621) -> io::Result<Option<PartitionOffsets>> {
622 let mut open_options = OpenOptions::new();
623 open_options.read(true);
624 let mut topic_files = topic_file_op
625 .open_files(open_options, exclude_active_file)
626 .into_iter();
627 match topic_files.next() {
628 Some(Ok(mut topic_file)) => {
629 let mut buf = BytesMut::with_capacity(read_buffer_size);
630 let mut decoder = StorableRecordDecoder::new(max_record_size);
631 let mut beginning_offset = None;
632 let mut end_offset = None;
633 loop {
634 let Ok(len) = read_buf(&mut topic_file, &mut buf) else {
635 break;
636 };
637
638 let decode_fn = if len == 0 {
639 StorableRecordDecoder::decode_eof
640 } else {
641 StorableRecordDecoder::decode
642 };
643 while let Some(record) = decode_fn(&mut decoder, &mut buf)? {
644 if beginning_offset.is_none() {
645 beginning_offset = Some(record.offset);
646 end_offset = Some(record.offset);
647 } else {
648 end_offset = Some(record.offset);
649 }
650 }
651 if len == 0 {
652 match topic_files.next() {
653 Some(Ok(tf)) => topic_file = tf,
654 Some(Err(e)) => return Err(e),
655 None => break,
656 }
657 }
658 }
659 Ok(Some(PartitionOffsets {
660 beginning_offset: beginning_offset.unwrap_or(0),
661 end_offset: end_offset.unwrap_or(0),
662 }))
663 }
664 Some(Err(e)) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
665 Some(Err(e)) => Err(e),
666 None => Ok(None),
667 }
668}
669
670fn recover_active_file(
671 topic_file_op: &mut TopicFileOp,
672 read_buffer_size: usize,
673 max_record_size: usize,
674) -> Result<(), TopicFileOpError> {
675 let mut open_options = OpenOptions::new();
676 open_options.read(true).write(true);
677 let mut topic_file = topic_file_op.open_active_file(open_options)?;
678 let mut buf = BytesMut::with_capacity(read_buffer_size);
679 let mut decoder = StorableRecordDecoder::new(max_record_size);
680 let mut bytes_read = None;
681 loop {
682 let Ok(len) = read_buf(&mut topic_file, &mut buf) else {
683 break;
684 };
685
686 let before_decode_len = buf.len();
687
688 let decode_fn = if len == 0 {
689 StorableRecordDecoder::decode_eof
690 } else {
691 StorableRecordDecoder::decode
692 };
693 let mut r = decode_fn(&mut decoder, &mut buf);
694 while let Ok(Some(_)) = r {
695 r = decode_fn(&mut decoder, &mut buf);
696 }
697 match r {
698 Ok(None) if len == 0 => break,
699 Ok(_) => (),
700 Err(_) => {
701 if let Some(bytes_read) = bytes_read {
702 topic_file
703 .set_len(bytes_read)
704 .map_err(TopicFileOpError::IoError)?;
705 }
706 break;
707 }
708 }
709
710 let consumed_bytes = (before_decode_len - buf.len()) as u64;
711 bytes_read =
712 bytes_read.map_or_else(|| Some(consumed_bytes), |br| br.checked_add(consumed_bytes));
713 }
714 Ok(())
715}
716
717fn read_buf<B>(file: &mut File, buf: &mut B) -> io::Result<usize>
720where
721 B: BufMut,
722{
723 let chunk = buf.chunk_mut();
724 let len = chunk.len();
725 let ptr = chunk.as_mut_ptr();
726 let unused_buf = unsafe { slice::from_raw_parts_mut(ptr, len) };
727 let result = file.read(unused_buf);
728 if let Ok(len) = result {
729 unsafe {
730 buf.advance_mut(len);
731 }
732 }
733 result
734}
735
736struct StorableRecordDecoder {
737 max_record_size: usize,
738}
739
740impl StorableRecordDecoder {
741 pub fn new(max_record_size: usize) -> Self {
742 Self { max_record_size }
743 }
744}
745
746impl Decoder for StorableRecordDecoder {
747 type Item = StorableRecord;
748
749 type Error = std::io::Error;
750
751 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
752 let result = postcard::take_from_bytes_crc32::<StorableRecord>(src, CRC.digest());
753 match result {
754 Ok((record, remaining)) => {
755 src.advance(src.len() - remaining.len());
756 Ok(Some(record))
757 }
758 Err(e)
759 if e == postcard::Error::DeserializeUnexpectedEnd
760 && src.len() <= self.max_record_size =>
761 {
762 Ok(None)
763 }
764 Err(e) => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
765 }
766 }
767}
768
769#[cfg(test)]
770mod tests {
771 use std::{env, sync::Arc};
772
773 use test_log::test;
774 use tokio::{fs, sync::Notify};
775 use tokio_stream::StreamExt;
776
777 use super::*;
778
779 #[test(tokio::test)]
780 async fn test_produce_consume() {
781 let logged_dir = env::temp_dir().join("test_produce_consume");
782 let logged_dir = logged_dir.to_string_lossy().to_string();
784 let _ = fs::remove_dir_all(&logged_dir).await;
785 let _ = fs::create_dir_all(&logged_dir).await;
786 println!("Writing to {logged_dir}");
787
788 let cl = FileLog::new(logged_dir);
789 let task_cl = cl.clone();
790
791 let topic = Topic::from("my-topic");
792
793 assert!(cl.offsets(topic.clone(), 0).await.is_none());
794
795 let task_topic = topic.clone();
796 tokio::spawn(async move {
797 task_cl
798 .produce(ProducerRecord {
799 topic: task_topic.clone(),
800 headers: vec![],
801 timestamp: None,
802 key: 0,
803 value: b"some-value-0".to_vec(),
804 partition: 0,
805 })
806 .await
807 .unwrap();
808 task_cl
809 .produce(ProducerRecord {
810 topic: task_topic.clone(),
811 headers: vec![],
812 timestamp: None,
813 key: 0,
814 value: b"some-value-1".to_vec(),
815 partition: 0,
816 })
817 .await
818 .unwrap();
819 task_cl
820 .produce(ProducerRecord {
821 topic: task_topic.clone(),
822 headers: vec![],
823 timestamp: None,
824 key: 0,
825 value: b"some-value-2".to_vec(),
826 partition: 0,
827 })
828 .await
829 .unwrap();
830
831 time::sleep(TOPIC_FILE_PRODUCER_FLUSH * 2).await;
832 let offsets = task_cl.offsets(task_topic, 0).await.unwrap();
833 assert_eq!(
834 offsets,
835 PartitionOffsets {
836 beginning_offset: 0,
837 end_offset: 2
838 }
839 );
840 });
841
842 let offsets = vec![ConsumerOffset {
843 topic: topic.clone(),
844 partition: 0,
845 offset: 1,
846 }];
847 let subscriptions = vec![Subscription {
848 topic: topic.clone(),
849 }];
850 let mut records = cl.scoped_subscribe("some-consumer", offsets, subscriptions, None);
851
852 assert_eq!(
853 records.next().await,
854 Some(ConsumerRecord {
855 topic,
856 headers: vec![],
857 timestamp: None,
858 key: 0,
859 value: b"some-value-2".to_vec(),
860 partition: 0,
861 offset: 2
862 })
863 );
864 }
865
866 #[test(tokio::test)]
867 async fn test_produce_consume_with_split() {
868 let logged_dir = env::temp_dir().join("test_produce_consume_with_split");
869 let _ = fs::remove_dir_all(&logged_dir).await;
871 let _ = fs::create_dir_all(&logged_dir).await;
872 println!("Writing to {}", logged_dir.to_string_lossy());
873
874 let mut cl = FileLog::new(logged_dir.clone());
875 let mut task_cl = cl.clone();
876
877 let topic = Topic::from("my-topic");
878
879 cl.register_compaction(topic.clone(), compaction::KeyBasedRetention::new(1))
880 .await
881 .unwrap();
882
883 assert!(cl.offsets(topic.clone(), 0).await.is_none());
884
885 let task_topic = topic.clone();
886 tokio::spawn(async move {
887 task_cl
888 .produce(ProducerRecord {
889 topic: task_topic.clone(),
890 headers: vec![],
891 timestamp: None,
892 key: 0,
893 value: b"some-value-0".to_vec(),
894 partition: 0,
895 })
896 .await
897 .unwrap();
898 task_cl
899 .produce(ProducerRecord {
900 topic: task_topic.clone(),
901 headers: vec![],
902 timestamp: None,
903 key: 0,
904 value: b"some-value-1".to_vec(),
905 partition: 0,
906 })
907 .await
908 .unwrap();
909
910 let mut topic_file_op = {
915 let locked_topic_file_ops = task_cl.topic_file_ops.lock().unwrap();
916 locked_topic_file_ops.get(&task_topic).unwrap().clone()
917 };
918 topic_file_op.age_active_file().unwrap();
919
920 task_cl
921 .produce(ProducerRecord {
922 topic: task_topic.clone(),
923 headers: vec![],
924 timestamp: None,
925 key: 0,
926 value: b"some-value-2".to_vec(),
927 partition: 0,
928 })
929 .await
930 .unwrap();
931
932 time::sleep(TOPIC_FILE_PRODUCER_FLUSH * 2).await;
933 let offsets = task_cl.offsets(task_topic.clone(), 0).await.unwrap();
934 assert_eq!(
935 offsets,
936 PartitionOffsets {
937 beginning_offset: 0,
938 end_offset: 2
939 }
940 );
941
942 let topic_file_path = logged_dir.join(task_topic.as_str());
943 assert!(topic_file_path.exists());
944 assert!(topic_file_path
945 .with_extension(topic_file_op::HISTORY_FILE_EXTENSION)
946 .exists());
947
948 task_cl.close_topic(&task_topic);
949 });
950
951 let offsets = vec![ConsumerOffset {
952 topic: topic.clone(),
953 partition: 0,
954 offset: 1,
955 }];
956 let subscriptions = vec![Subscription {
957 topic: topic.clone(),
958 }];
959 let mut records = cl.scoped_subscribe("some-consumer", offsets, subscriptions, None);
960
961 assert_eq!(
962 records.next().await,
963 Some(ConsumerRecord {
964 topic,
965 headers: vec![],
966 timestamp: None,
967 key: 0,
968 value: b"some-value-2".to_vec(),
969 partition: 0,
970 offset: 2
971 })
972 );
973 }
974
975 #[test(tokio::test)]
976 async fn test_consume_wait_for_append() {
977 let logged_dir = env::temp_dir().join("test_consume_wait_for_append");
978 let _ = fs::remove_dir_all(&logged_dir).await;
979 let _ = fs::create_dir_all(&logged_dir).await;
980 println!("Writing to {}", logged_dir.to_string_lossy());
981
982 let cl = FileLog::new(logged_dir);
983 let task_cl = cl.clone();
984
985 let topic = Topic::from("my-topic");
986
987 let subscribing = Arc::new(Notify::new());
988 let task_subscribing = subscribing.clone();
989
990 let produced = Arc::new(Notify::new());
991 let task_produced = produced.clone();
992
993 let task_topic = topic.clone();
994 tokio::spawn(async move {
995 let subscriptions = vec![Subscription { topic: task_topic }];
996 let mut records =
997 task_cl.scoped_subscribe("some-consumer", vec![], subscriptions, None);
998 task_subscribing.notify_one();
999
1000 while records.next().await.is_some() {
1001 task_produced.notify_one();
1002 }
1003 });
1004
1005 subscribing.notified().await;
1006 time::sleep(TOPIC_FILE_CONSUMER_POLL + Duration::from_millis(500)).await;
1007
1008 cl.produce(ProducerRecord {
1009 topic: topic.clone(),
1010 headers: vec![],
1011 timestamp: None,
1012 key: 0,
1013 value: b"some-value-0".to_vec(),
1014 partition: 0,
1015 })
1016 .await
1017 .unwrap();
1018
1019 produced.notified().await;
1020 }
1021
1022 #[test(tokio::test)]
1023 async fn test_consume_with_idle() {
1024 let logged_dir = env::temp_dir().join("test_consume_with_idle");
1025 let _ = fs::remove_dir_all(&logged_dir).await;
1026 let _ = fs::create_dir_all(&logged_dir).await;
1027
1028 let cl = FileLog::new(logged_dir);
1029
1030 let topic = Topic::from("my-topic");
1031
1032 let offsets = vec![ConsumerOffset {
1033 topic: topic.clone(),
1034 partition: 0,
1035 offset: 1,
1036 }];
1037 let subscriptions = vec![Subscription {
1038 topic: topic.clone(),
1039 }];
1040 let mut records = cl.scoped_subscribe(
1041 "some-consumer",
1042 offsets,
1043 subscriptions,
1044 Some(Duration::from_millis(100)),
1045 );
1046 assert!(records.next().await.is_none());
1047
1048 cl.produce(ProducerRecord {
1049 topic: topic.clone(),
1050 headers: vec![],
1051 timestamp: None,
1052 key: 0,
1053 value: b"some-value-0".to_vec(),
1054 partition: 0,
1055 })
1056 .await
1057 .unwrap();
1058
1059 let subscriptions = vec![Subscription { topic }];
1060 let mut records = cl.scoped_subscribe(
1061 "some-consumer",
1062 vec![],
1063 subscriptions,
1064 Some(TOPIC_FILE_CONSUMER_POLL + Duration::from_millis(500)),
1065 );
1066 assert!(records.next().await.is_some());
1067 assert!(records.next().await.is_none());
1068 }
1069
1070 #[test(tokio::test)]
1071 async fn test_recovery() {
1072 let logged_dir = env::temp_dir().join("test_recovery");
1073 let _ = fs::remove_dir_all(&logged_dir).await;
1074 let _ = fs::create_dir_all(&logged_dir).await;
1075 println!("Writing to {logged_dir:?}");
1076
1077 let cl = FileLog::new(logged_dir.clone());
1078
1079 let topic = Topic::from("my-topic");
1080
1081 cl.produce(ProducerRecord {
1082 topic: topic.clone(),
1083 headers: vec![],
1084 timestamp: None,
1085 key: 0,
1086 value: b"some-value-0".to_vec(),
1087 partition: 0,
1088 })
1089 .await
1090 .unwrap();
1091 cl.produce(ProducerRecord {
1092 topic: topic.clone(),
1093 headers: vec![],
1094 timestamp: None,
1095 key: 0,
1096 value: b"some-value-1".to_vec(),
1097 partition: 0,
1098 })
1099 .await
1100 .unwrap();
1101 cl.produce(ProducerRecord {
1102 topic: topic.clone(),
1103 headers: vec![],
1104 timestamp: None,
1105 key: 0,
1106 value: b"some-value-2".to_vec(),
1107 partition: 0,
1108 })
1109 .await
1110 .unwrap();
1111
1112 drop(cl);
1114
1115 let topic_file_path = logged_dir.join(topic.as_str());
1118 let topic_file = fs::OpenOptions::new()
1119 .write(true)
1120 .open(topic_file_path)
1121 .await
1122 .unwrap();
1123
1124 let len = topic_file.metadata().await.unwrap().len();
1125 topic_file.set_len(len - 2).await.unwrap();
1126
1127 let cl = FileLog::new(logged_dir.clone());
1130
1131 cl.produce(ProducerRecord {
1132 topic: topic.clone(),
1133 headers: vec![],
1134 timestamp: None,
1135 key: 0,
1136 value: b"some-value-3".to_vec(),
1137 partition: 0,
1138 })
1139 .await
1140 .unwrap();
1141
1142 let offsets = vec![ConsumerOffset {
1143 topic: topic.clone(),
1144 partition: 0,
1145 offset: 0,
1146 }];
1147 let subscriptions = vec![Subscription {
1148 topic: topic.clone(),
1149 }];
1150 let mut records = cl.scoped_subscribe("some-consumer", offsets, subscriptions, None);
1151
1152 assert_eq!(
1153 records.next().await,
1154 Some(ConsumerRecord {
1155 topic: topic.clone(),
1156 headers: vec![],
1157 timestamp: None,
1158 key: 0,
1159 value: b"some-value-1".to_vec(),
1160 partition: 0,
1161 offset: 1
1162 })
1163 );
1164
1165 assert_eq!(
1166 records.next().await,
1167 Some(ConsumerRecord {
1168 topic,
1169 headers: vec![],
1170 timestamp: None,
1171 key: 0,
1172 value: b"some-value-3".to_vec(),
1173 partition: 0,
1174 offset: 2
1175 })
1176 );
1177 }
1178}