streambed_logged/
lib.rs

1#![doc = include_str!("../README.md")]
2
3pub mod args;
4pub mod compaction;
5mod topic_file_op;
6
7use async_stream::stream;
8use async_trait::async_trait;
9use bytes::BufMut;
10use bytes::{Buf, BytesMut};
11use chrono::{DateTime, Utc};
12use compaction::{CompactionStrategy, Compactor, ScopedTopicSubscriber, TopicStorageOps};
13use crc::{Crc, CRC_32_ISCSI};
14use log::{error, trace, warn};
15use serde::{Deserialize, Serialize};
16use serde_with::{serde_as, TimestampSecondsWithFrac};
17use std::fs::{self, File, OpenOptions};
18use std::io::{self, Read, Write};
19use std::slice;
20use std::sync::{Arc, Mutex};
21use std::{
22    collections::{hash_map::Entry, HashMap, VecDeque},
23    path::{Path, PathBuf},
24    pin::Pin,
25    time::Duration,
26};
27use streambed::commit_log::{
28    CommitLog, ConsumerOffset, ConsumerRecord, Header, HeaderKey, Offset, PartitionOffsets,
29    ProducedOffset, ProducerError, ProducerRecord, Subscription, Topic,
30};
31use streambed::commit_log::{Key, Partition};
32use tokio::{
33    sync::{mpsc, oneshot},
34    time,
35};
36use tokio_stream::Stream;
37use tokio_util::codec::Decoder;
38use topic_file_op::TopicFileOp;
39
40use crate::topic_file_op::TopicFileOpError;
41
42const COMPACTOR_QUEUE_SIZE: usize = 10;
43const COMPACTOR_WRITE_POLL: Duration = Duration::from_millis(10);
44const CONSUMER_QUEUE_SIZE: usize = 10;
45static CRC: Crc<u32> = Crc::<u32>::new(&CRC_32_ISCSI);
46
47const PRODUCER_QUEUE_SIZE: usize = 10;
48const TOPIC_FILE_CONSUMER_POLL: Duration = Duration::from_secs(1);
49const TOPIC_FILE_PRODUCER_FLUSH: Duration = Duration::from_millis(10);
50
51type ProduceReply = Result<ProducedOffset, ProducerError>;
52type ProduceRequest = (ProducerRecord, oneshot::Sender<ProduceReply>);
53type ShareableTopicMap<T> = Arc<Mutex<HashMap<Topic, T>>>;
54
55/// A commit log implementation that uses the file system as its
56/// backing store.
57///
58/// Considerations:
59///
60/// 1. Partition values cannot be non-zero.
61/// 2. The number of subscriptions of a topic will translate to
62///    the number of tasks that are spawned, along with their
63///    associated resources.
64/// 3. Only one process can produce to a specific topic. There
65///    is no process-wide locking considered. Multiple processes
66///    can read a topic though.
67#[derive(Clone)]
68pub struct FileLog {
69    compactor_txs: ShareableTopicMap<mpsc::Sender<u64>>,
70    compaction_threshold_size: u64,
71    compaction_write_buffer_size: usize,
72    max_record_size: usize,
73    read_buffer_size: usize,
74    producer_txs: ShareableTopicMap<mpsc::Sender<ProduceRequest>>,
75    pub(crate) topic_file_ops: ShareableTopicMap<TopicFileOp>,
76    root_path: PathBuf,
77    write_buffer_size: usize,
78}
79
80#[derive(Clone, Deserialize, Debug, Eq, PartialEq, Serialize)]
81pub struct StorableHeader {
82    key: HeaderKey,
83    value: Vec<u8>,
84}
85
86#[serde_as]
87#[derive(Clone, Deserialize, Debug, Eq, PartialEq, Serialize)]
88struct StorableRecord {
89    version: u32,
90    headers: Vec<StorableHeader>,
91    #[serde_as(as = "Option<TimestampSecondsWithFrac>")]
92    timestamp: Option<DateTime<Utc>>,
93    key: u64,
94    value: Vec<u8>,
95    offset: u64,
96}
97
98/// Some unrecoverable issue when attempting to register compaction.
99#[derive(Debug)]
100pub struct CompactionRegistrationError;
101
102impl FileLog {
103    /// Construct a new file log that will also spawn a task for each
104    /// topic being produced.
105    pub fn new<P>(root_path: P) -> Self
106    where
107        P: Into<PathBuf>,
108    {
109        Self::with_config(root_path, 64 * 1024, 8192, 64 * 1024, 8 * 1024, 16 * 1024)
110    }
111
112    /// Construct a new file log that will also spawn a task for each
113    /// topic being produced. The compaction_threshold_size is the size of the
114    /// active file that the compactor looks at before deciding to perform a
115    /// compaction (in bytes). This typically equates to the blocksize on disk
116    /// i.e. 64KB for flash based storage. 64KB is still small enough that scans
117    /// over a topic are relatively fast, working on the principle of having roughly
118    /// 2,000 records. We also require a read and write buffer sizes to reduce
119    /// system calls. When writing, either the buffer reaches capacity or a
120    /// flush of the buffer occurs in the absence of another write to perform.
121    pub fn with_config<P>(
122        root_path: P,
123        compaction_threshold_size: u64,
124        read_buffer_size: usize,
125        compaction_write_buffer_size: usize,
126        write_buffer_size: usize,
127        max_record_size: usize,
128    ) -> Self
129    where
130        P: Into<PathBuf>,
131    {
132        Self {
133            compactor_txs: Arc::new(Mutex::new(HashMap::new())),
134            compaction_threshold_size,
135            compaction_write_buffer_size,
136            max_record_size,
137            read_buffer_size,
138            root_path: root_path.into(),
139            producer_txs: Arc::new(Mutex::new(HashMap::new())),
140            topic_file_ops: Arc::new(Mutex::new(HashMap::new())),
141            write_buffer_size,
142        }
143    }
144
145    /// Frees resources associated with a topic, but not any associated compaction.
146    /// Invoking the method is benign in that if consuming or producing occurs
147    /// on this post closing, resources will be re-established.
148    pub fn close_topic(&mut self, topic: &Topic) {
149        if let Ok(mut locked_producer_txs) = self.producer_txs.lock() {
150            locked_producer_txs.remove(topic);
151        }
152        if let Ok(mut locked_topic_file_ops) = self.topic_file_ops.lock() {
153            locked_topic_file_ops.remove(topic);
154        }
155    }
156
157    /// Register compaction for a given topic. Any previously registered compaction
158    /// is replaced. A new task for compaction will be created in the background.
159    ///
160    /// Compaction's memory can be controlled somewhat through `compaction_write_buffer_size`
161    /// when creating this file commit log. This buffer size is selected to minimize
162    /// writing to flash and will be allocated once per topic compaction registered here.
163    pub async fn register_compaction<CS>(
164        &mut self,
165        topic: Topic,
166        compaction_strategy: CS,
167    ) -> Result<(), CompactionRegistrationError>
168    where
169        CS: CompactionStrategy + Send + Sync + 'static,
170    {
171        let topic_file_op = {
172            let Ok(mut locked_topic_file_ops) = self.topic_file_ops.lock() else {
173                return Err(CompactionRegistrationError);
174            };
175            acquire_topic_file_ops(&self.root_path, &topic, &mut locked_topic_file_ops)
176        };
177
178        let mut age_active_file_topic_file_op = topic_file_op.clone();
179        let age_active_file_read_buffer_size = self.read_buffer_size;
180        let age_active_file_max_record_size = self.max_record_size;
181        let new_work_file_topic_file_op = topic_file_op.clone();
182        let recover_history_files_topic_file_op = topic_file_op.clone();
183        let replace_history_files_topic_file_op = topic_file_op;
184
185        let compaction_write_buffer_size = self.compaction_write_buffer_size;
186
187        let mut compactor = Compactor::new(
188            compaction_strategy,
189            self.compaction_threshold_size,
190            ScopedTopicSubscriber::new(self.clone(), topic.clone()),
191            TopicStorageOps::new(
192                move || {
193                    age_active_file_topic_file_op.age_active_file()?;
194                    find_offset(
195                        &age_active_file_topic_file_op,
196                        age_active_file_read_buffer_size,
197                        age_active_file_max_record_size,
198                        true,
199                    )
200                    .map(|o| o.map(|o| o.end_offset))
201                    .map_err(TopicFileOpError::IoError)
202                },
203                move || new_work_file_topic_file_op.new_work_file(compaction_write_buffer_size),
204                move || recover_history_files_topic_file_op.recover_history_files(),
205                move || replace_history_files_topic_file_op.replace_history_files(),
206            ),
207        );
208
209        let (compactor_tx, mut compactor_rx) = mpsc::channel::<u64>(COMPACTOR_QUEUE_SIZE);
210
211        tokio::spawn(async move {
212            let mut recv = compactor_rx.recv().await;
213            while let Some(active_file_size) = recv {
214                compactor.step(active_file_size).await;
215                if compactor.is_idle() {
216                    recv = compactor_rx.recv().await;
217                } else if let Ok(r) = time::timeout(COMPACTOR_WRITE_POLL, compactor_rx.recv()).await
218                {
219                    recv = r;
220                }
221            }
222        });
223
224        if let Ok(mut compactors) = self.compactor_txs.lock() {
225            compactors.insert(topic, compactor_tx);
226        }
227
228        Ok(())
229    }
230
231    /// Unregister compaction for a given topic
232    pub fn unregister_compaction(&mut self, topic: &Topic) {
233        if let Ok(mut compactors) = self.compactor_txs.lock() {
234            compactors.remove(topic);
235        }
236    }
237}
238
239#[async_trait]
240impl CommitLog for FileLog {
241    async fn offsets(&self, topic: Topic, _partition: Partition) -> Option<PartitionOffsets> {
242        let Ok(mut locked_topic_file_ops) = self.topic_file_ops.lock() else {
243            return None;
244        };
245        let topic_file_op =
246            acquire_topic_file_ops(&self.root_path, &topic, &mut locked_topic_file_ops);
247        drop(locked_topic_file_ops);
248
249        find_offset(
250            &topic_file_op,
251            self.read_buffer_size,
252            self.max_record_size,
253            false,
254        )
255        .ok()
256        .flatten()
257    }
258
259    async fn produce(&self, record: ProducerRecord) -> ProduceReply {
260        let topic_producer = {
261            let Ok(mut locked_producer_map) = self.producer_txs.lock() else {
262                return Err(ProducerError::CannotProduce);
263            };
264            if let Some(topic_producer) = locked_producer_map.get(&record.topic) {
265                let producer_tx = topic_producer.clone();
266                drop(locked_producer_map);
267                producer_tx
268            } else {
269                let (producer_tx, mut producer_rx) =
270                    mpsc::channel::<ProduceRequest>(PRODUCER_QUEUE_SIZE);
271                locked_producer_map.insert(record.topic.clone(), producer_tx.clone());
272                drop(locked_producer_map); // drop early so we don't double-lock with the next thing
273
274                let Ok(mut locked_topic_file_ops) = self.topic_file_ops.lock() else {
275                    return Err(ProducerError::CannotProduce);
276                };
277                let mut topic_file_op = acquire_topic_file_ops(
278                    &self.root_path,
279                    &record.topic,
280                    &mut locked_topic_file_ops,
281                );
282                drop(locked_topic_file_ops);
283
284                let found_offsets = match find_offset(
285                    &topic_file_op,
286                    self.read_buffer_size,
287                    self.max_record_size,
288                    false,
289                ) {
290                    r @ Ok(_) => r,
291                    Err(e) => {
292                        error!("Error {e} when producing. Attempting to recover by truncating the active file.");
293
294                        if let Err(e) = recover_active_file(
295                            &mut topic_file_op,
296                            self.read_buffer_size,
297                            self.max_record_size,
298                        ) {
299                            error!("Error {e} when recoverying. Unable to recover the active file.")
300                        }
301
302                        find_offset(
303                            &topic_file_op,
304                            self.read_buffer_size,
305                            self.max_record_size,
306                            false,
307                        )
308                    }
309                };
310                let mut next_offset = found_offsets
311                    .map(|offsets| offsets.map_or(0, |offsets| offsets.end_offset.wrapping_add(1)));
312
313                let task_root_path = self.root_path.clone();
314                let task_compactor_txs = self.compactor_txs.clone();
315                let task_topic_file_ops = self.topic_file_ops.clone();
316                let task_write_buffer_size = self.write_buffer_size;
317
318                let mut open_options = fs::OpenOptions::new();
319                open_options.append(true).create(true);
320
321                let mut file_size = topic_file_op
322                    .active_file_size(&open_options, task_write_buffer_size)
323                    .unwrap_or_default();
324
325                tokio::spawn({
326                    async move {
327                        let mut recv = producer_rx.recv().await;
328                        while let Some((record, reply_to)) = recv {
329                            if let Ok(next_offset) = &mut next_offset {
330                                let topic_file_op = {
331                                    if let Ok(mut locked_topic_file_ops) =
332                                        task_topic_file_ops.lock()
333                                    {
334                                        Some(acquire_topic_file_ops(
335                                            &task_root_path,
336                                            &record.topic,
337                                            &mut locked_topic_file_ops,
338                                        ))
339                                    } else {
340                                        None
341                                    }
342                                };
343                                if let Some(mut topic_file_op) = topic_file_op {
344                                    let r = topic_file_op.with_active_file(
345                                        &open_options,
346                                        task_write_buffer_size,
347                                        |file| {
348                                            let storable_record = StorableRecord {
349                                                version: 0,
350                                                headers: record
351                                                    .headers
352                                                    .into_iter()
353                                                    .map(|h| StorableHeader {
354                                                        key: h.key,
355                                                        value: h.value,
356                                                    })
357                                                    .collect(),
358                                                timestamp: record.timestamp,
359                                                key: record.key,
360                                                value: record.value,
361                                                offset: *next_offset,
362                                            };
363
364                                            trace!("Producing record: {:?}", storable_record);
365
366                                            if let Ok(buf) = postcard::to_stdvec_crc32(
367                                                &storable_record,
368                                                CRC.digest(),
369                                            ) {
370                                                file.write_all(&buf)
371                                                    .map_err(TopicFileOpError::IoError)
372                                                    .map(|_| buf.len())
373                                            } else {
374                                                Err(TopicFileOpError::CannotSerialize)
375                                            }
376                                        },
377                                    );
378
379                                    if let Ok((bytes_written, is_new_active_file)) = r {
380                                        let _ = reply_to.send(Ok(ProducedOffset {
381                                            offset: *next_offset,
382                                        }));
383
384                                        *next_offset = next_offset.wrapping_add(1);
385
386                                        if is_new_active_file {
387                                            file_size = 0;
388                                        }
389                                        file_size = file_size.wrapping_add(bytes_written as u64);
390
391                                        let compactor_tx = {
392                                            if let Ok(locked_task_compactor_txs) =
393                                                task_compactor_txs.lock()
394                                            {
395                                                locked_task_compactor_txs
396                                                    .get(&record.topic)
397                                                    .cloned()
398                                            } else {
399                                                None
400                                            }
401                                        };
402                                        if let Some(compactor_tx) = compactor_tx {
403                                            let _ = compactor_tx.send(file_size).await;
404                                        }
405
406                                        match time::timeout(
407                                            TOPIC_FILE_PRODUCER_FLUSH,
408                                            producer_rx.recv(),
409                                        )
410                                        .await
411                                        {
412                                            Ok(r) => recv = r,
413                                            Err(_) => {
414                                                let _ = topic_file_op.flush_active_file();
415                                                recv = producer_rx.recv().await;
416                                            }
417                                        }
418
419                                        continue;
420                                    }
421                                }
422                            }
423
424                            let _ = reply_to.send(Err(ProducerError::CannotProduce));
425                            recv = producer_rx.recv().await;
426                        }
427                    }
428                });
429
430                producer_tx
431            }
432        };
433
434        let (reply_tx, reply_rx) = oneshot::channel();
435        if topic_producer.send((record, reply_tx)).await.is_ok() {
436            if let Ok(reply) = reply_rx.await {
437                reply
438            } else {
439                Err(ProducerError::CannotProduce)
440            }
441        } else {
442            Err(ProducerError::CannotProduce)
443        }
444    }
445
446    fn scoped_subscribe<'a>(
447        &'a self,
448        _consumer_group_name: &str,
449        offsets: Vec<ConsumerOffset>,
450        subscriptions: Vec<Subscription>,
451        idle_timeout: Option<Duration>,
452    ) -> Pin<Box<dyn Stream<Item = ConsumerRecord> + Send + 'a>> {
453        let offsets = offsets
454            .iter()
455            .map(|e| {
456                assert_eq!(e.partition, 0);
457                (e.topic.to_owned(), e.offset)
458            })
459            .collect::<HashMap<Topic, u64>>();
460
461        let (tx, mut rx) = mpsc::channel(CONSUMER_QUEUE_SIZE);
462
463        let mut open_options = OpenOptions::new();
464        open_options.read(true);
465
466        for s in subscriptions {
467            let task_root_path = self.root_path.clone();
468            let task_topic = s.topic.clone();
469            let mut task_offset = offsets.get(&s.topic).copied();
470            let task_tx = tx.clone();
471            let task_read_buffer_size = self.read_buffer_size;
472            let task_max_record_size = self.max_record_size;
473            let task_topic_file_ops = self.topic_file_ops.clone();
474            let task_open_options = open_options.clone();
475            tokio::spawn(async move {
476                let mut buf = BytesMut::with_capacity(task_read_buffer_size);
477                let mut decoder = StorableRecordDecoder::new(task_max_record_size);
478                'outer: loop {
479                    buf.clear();
480
481                    let topic_file_op = {
482                        let Ok(mut locked_topic_file_ops) = task_topic_file_ops.lock() else {
483                            break;
484                        };
485                        let topic_file_op = acquire_topic_file_ops(
486                            &task_root_path,
487                            &task_topic,
488                            &mut locked_topic_file_ops,
489                        );
490                        drop(locked_topic_file_ops);
491                        topic_file_op
492                    };
493
494                    let mut topic_files = topic_file_op
495                        .open_files(task_open_options.clone(), false)
496                        .into_iter();
497                    match topic_files.next() {
498                        Some(Ok(mut topic_file)) => loop {
499                            let Ok(len) = read_buf(&mut topic_file, &mut buf) else {
500                                break;
501                            };
502
503                            let decode_fn = if len == 0 {
504                                StorableRecordDecoder::decode_eof
505                            } else {
506                                StorableRecordDecoder::decode
507                            };
508                            let mut r = decode_fn(&mut decoder, &mut buf);
509                            while let Ok(Some(record)) = r {
510                                if task_offset.is_none() || Some(record.offset) > task_offset {
511                                    let consumer_record = ConsumerRecord {
512                                        topic: task_topic.clone(),
513                                        headers: record
514                                            .headers
515                                            .into_iter()
516                                            .map(|h| Header {
517                                                key: h.key,
518                                                value: h.value,
519                                            })
520                                            .collect(),
521                                        timestamp: record.timestamp,
522                                        key: record.key,
523                                        value: record.value,
524                                        partition: 0,
525                                        offset: record.offset,
526                                    };
527
528                                    trace!("Consumed record: {:?}", consumer_record);
529
530                                    if task_tx.send(consumer_record).await.is_err() {
531                                        break 'outer;
532                                    }
533
534                                    task_offset = Some(record.offset)
535                                }
536
537                                r = decode_fn(&mut decoder, &mut buf);
538                            }
539                            match r {
540                                Ok(Some(_)) => (), // Should never happen
541                                Ok(None) if len == 0 => match topic_files.next() {
542                                    Some(Ok(tf)) => topic_file = tf,
543                                    Some(Err(e)) => {
544                                        warn!("Error consuming topic file: {e} - aborting subscription for {task_topic}");
545                                        break 'outer;
546                                    }
547                                    None => {
548                                        if task_tx.is_closed() {
549                                            break 'outer;
550                                        }
551                                        time::sleep(TOPIC_FILE_CONSUMER_POLL).await;
552                                        continue 'outer;
553                                    }
554                                },
555                                Ok(None) => (),
556                                Err(e) => {
557                                    if task_tx.is_closed() {
558                                        break 'outer;
559                                    }
560                                    trace!("Topic is corrupt for {topic_file:?}. Error {e} occurred when subscribed. Retrying.");
561                                    time::sleep(TOPIC_FILE_CONSUMER_POLL).await;
562                                    continue 'outer;
563                                }
564                            }
565                        },
566                        Some(Err(e)) if e.kind() == std::io::ErrorKind::NotFound => {
567                            if task_tx.is_closed() {
568                                break;
569                            }
570                            time::sleep(TOPIC_FILE_CONSUMER_POLL).await
571                        }
572                        Some(Err(e)) => {
573                            warn!("Error reading topic file: {e} - aborting subscription");
574                        }
575                        None => {
576                            if task_tx.is_closed() {
577                                break;
578                            }
579                            time::sleep(TOPIC_FILE_CONSUMER_POLL).await
580                        }
581                    }
582                }
583            });
584        }
585
586        Box::pin(stream!({
587            if let Some(it) = idle_timeout {
588                while let Some(record) = time::timeout(it, rx.recv()).await.ok().flatten() {
589                    yield record;
590                }
591            } else {
592                while let Some(record) = rx.recv().await {
593                    yield record;
594                }
595            }
596            trace!("Ending subscriptions");
597        }))
598    }
599}
600
601fn acquire_topic_file_ops(
602    root_path: &Path,
603    topic: &Topic,
604    topic_file_ops: &mut HashMap<Topic, TopicFileOp>,
605) -> TopicFileOp {
606    if let Some(topic_file_op) = topic_file_ops.get(topic) {
607        topic_file_op.clone()
608    } else {
609        let topic = topic.clone();
610        let topic_file_op = TopicFileOp::new(root_path.to_path_buf(), topic.clone());
611        topic_file_ops.insert(topic, topic_file_op.clone());
612        topic_file_op
613    }
614}
615
616fn find_offset(
617    topic_file_op: &TopicFileOp,
618    read_buffer_size: usize,
619    max_record_size: usize,
620    exclude_active_file: bool,
621) -> io::Result<Option<PartitionOffsets>> {
622    let mut open_options = OpenOptions::new();
623    open_options.read(true);
624    let mut topic_files = topic_file_op
625        .open_files(open_options, exclude_active_file)
626        .into_iter();
627    match topic_files.next() {
628        Some(Ok(mut topic_file)) => {
629            let mut buf = BytesMut::with_capacity(read_buffer_size);
630            let mut decoder = StorableRecordDecoder::new(max_record_size);
631            let mut beginning_offset = None;
632            let mut end_offset = None;
633            loop {
634                let Ok(len) = read_buf(&mut topic_file, &mut buf) else {
635                    break;
636                };
637
638                let decode_fn = if len == 0 {
639                    StorableRecordDecoder::decode_eof
640                } else {
641                    StorableRecordDecoder::decode
642                };
643                while let Some(record) = decode_fn(&mut decoder, &mut buf)? {
644                    if beginning_offset.is_none() {
645                        beginning_offset = Some(record.offset);
646                        end_offset = Some(record.offset);
647                    } else {
648                        end_offset = Some(record.offset);
649                    }
650                }
651                if len == 0 {
652                    match topic_files.next() {
653                        Some(Ok(tf)) => topic_file = tf,
654                        Some(Err(e)) => return Err(e),
655                        None => break,
656                    }
657                }
658            }
659            Ok(Some(PartitionOffsets {
660                beginning_offset: beginning_offset.unwrap_or(0),
661                end_offset: end_offset.unwrap_or(0),
662            }))
663        }
664        Some(Err(e)) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
665        Some(Err(e)) => Err(e),
666        None => Ok(None),
667    }
668}
669
670fn recover_active_file(
671    topic_file_op: &mut TopicFileOp,
672    read_buffer_size: usize,
673    max_record_size: usize,
674) -> Result<(), TopicFileOpError> {
675    let mut open_options = OpenOptions::new();
676    open_options.read(true).write(true);
677    let mut topic_file = topic_file_op.open_active_file(open_options)?;
678    let mut buf = BytesMut::with_capacity(read_buffer_size);
679    let mut decoder = StorableRecordDecoder::new(max_record_size);
680    let mut bytes_read = None;
681    loop {
682        let Ok(len) = read_buf(&mut topic_file, &mut buf) else {
683            break;
684        };
685
686        let before_decode_len = buf.len();
687
688        let decode_fn = if len == 0 {
689            StorableRecordDecoder::decode_eof
690        } else {
691            StorableRecordDecoder::decode
692        };
693        let mut r = decode_fn(&mut decoder, &mut buf);
694        while let Ok(Some(_)) = r {
695            r = decode_fn(&mut decoder, &mut buf);
696        }
697        match r {
698            Ok(None) if len == 0 => break,
699            Ok(_) => (),
700            Err(_) => {
701                if let Some(bytes_read) = bytes_read {
702                    topic_file
703                        .set_len(bytes_read)
704                        .map_err(TopicFileOpError::IoError)?;
705                }
706                break;
707            }
708        }
709
710        let consumed_bytes = (before_decode_len - buf.len()) as u64;
711        bytes_read =
712            bytes_read.map_or_else(|| Some(consumed_bytes), |br| br.checked_add(consumed_bytes));
713    }
714    Ok(())
715}
716
717// Similar to Tokio's AsyncReadExt [`read_buf`](https://docs.rs/tokio/latest/tokio/io/trait.AsyncReadExt.html#method.read_buf).
718// Thanks to Alice Ryhl: https://discord.com/channels/500028886025895936/627696030334582784/1071037851980021761
719fn read_buf<B>(file: &mut File, buf: &mut B) -> io::Result<usize>
720where
721    B: BufMut,
722{
723    let chunk = buf.chunk_mut();
724    let len = chunk.len();
725    let ptr = chunk.as_mut_ptr();
726    let unused_buf = unsafe { slice::from_raw_parts_mut(ptr, len) };
727    let result = file.read(unused_buf);
728    if let Ok(len) = result {
729        unsafe {
730            buf.advance_mut(len);
731        }
732    }
733    result
734}
735
736struct StorableRecordDecoder {
737    max_record_size: usize,
738}
739
740impl StorableRecordDecoder {
741    pub fn new(max_record_size: usize) -> Self {
742        Self { max_record_size }
743    }
744}
745
746impl Decoder for StorableRecordDecoder {
747    type Item = StorableRecord;
748
749    type Error = std::io::Error;
750
751    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
752        let result = postcard::take_from_bytes_crc32::<StorableRecord>(src, CRC.digest());
753        match result {
754            Ok((record, remaining)) => {
755                src.advance(src.len() - remaining.len());
756                Ok(Some(record))
757            }
758            Err(e)
759                if e == postcard::Error::DeserializeUnexpectedEnd
760                    && src.len() <= self.max_record_size =>
761            {
762                Ok(None)
763            }
764            Err(e) => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
765        }
766    }
767}
768
769#[cfg(test)]
770mod tests {
771    use std::{env, sync::Arc};
772
773    use test_log::test;
774    use tokio::{fs, sync::Notify};
775    use tokio_stream::StreamExt;
776
777    use super::*;
778
779    #[test(tokio::test)]
780    async fn test_produce_consume() {
781        let logged_dir = env::temp_dir().join("test_produce_consume");
782        // Deliberately converting to a String to test Into<PathBuf>
783        let logged_dir = logged_dir.to_string_lossy().to_string();
784        let _ = fs::remove_dir_all(&logged_dir).await;
785        let _ = fs::create_dir_all(&logged_dir).await;
786        println!("Writing to {logged_dir}");
787
788        let cl = FileLog::new(logged_dir);
789        let task_cl = cl.clone();
790
791        let topic = Topic::from("my-topic");
792
793        assert!(cl.offsets(topic.clone(), 0).await.is_none());
794
795        let task_topic = topic.clone();
796        tokio::spawn(async move {
797            task_cl
798                .produce(ProducerRecord {
799                    topic: task_topic.clone(),
800                    headers: vec![],
801                    timestamp: None,
802                    key: 0,
803                    value: b"some-value-0".to_vec(),
804                    partition: 0,
805                })
806                .await
807                .unwrap();
808            task_cl
809                .produce(ProducerRecord {
810                    topic: task_topic.clone(),
811                    headers: vec![],
812                    timestamp: None,
813                    key: 0,
814                    value: b"some-value-1".to_vec(),
815                    partition: 0,
816                })
817                .await
818                .unwrap();
819            task_cl
820                .produce(ProducerRecord {
821                    topic: task_topic.clone(),
822                    headers: vec![],
823                    timestamp: None,
824                    key: 0,
825                    value: b"some-value-2".to_vec(),
826                    partition: 0,
827                })
828                .await
829                .unwrap();
830
831            time::sleep(TOPIC_FILE_PRODUCER_FLUSH * 2).await;
832            let offsets = task_cl.offsets(task_topic, 0).await.unwrap();
833            assert_eq!(
834                offsets,
835                PartitionOffsets {
836                    beginning_offset: 0,
837                    end_offset: 2
838                }
839            );
840        });
841
842        let offsets = vec![ConsumerOffset {
843            topic: topic.clone(),
844            partition: 0,
845            offset: 1,
846        }];
847        let subscriptions = vec![Subscription {
848            topic: topic.clone(),
849        }];
850        let mut records = cl.scoped_subscribe("some-consumer", offsets, subscriptions, None);
851
852        assert_eq!(
853            records.next().await,
854            Some(ConsumerRecord {
855                topic,
856                headers: vec![],
857                timestamp: None,
858                key: 0,
859                value: b"some-value-2".to_vec(),
860                partition: 0,
861                offset: 2
862            })
863        );
864    }
865
866    #[test(tokio::test)]
867    async fn test_produce_consume_with_split() {
868        let logged_dir = env::temp_dir().join("test_produce_consume_with_split");
869        // Deliberately converting to a String to test Into<PathBuf>
870        let _ = fs::remove_dir_all(&logged_dir).await;
871        let _ = fs::create_dir_all(&logged_dir).await;
872        println!("Writing to {}", logged_dir.to_string_lossy());
873
874        let mut cl = FileLog::new(logged_dir.clone());
875        let mut task_cl = cl.clone();
876
877        let topic = Topic::from("my-topic");
878
879        cl.register_compaction(topic.clone(), compaction::KeyBasedRetention::new(1))
880            .await
881            .unwrap();
882
883        assert!(cl.offsets(topic.clone(), 0).await.is_none());
884
885        let task_topic = topic.clone();
886        tokio::spawn(async move {
887            task_cl
888                .produce(ProducerRecord {
889                    topic: task_topic.clone(),
890                    headers: vec![],
891                    timestamp: None,
892                    key: 0,
893                    value: b"some-value-0".to_vec(),
894                    partition: 0,
895                })
896                .await
897                .unwrap();
898            task_cl
899                .produce(ProducerRecord {
900                    topic: task_topic.clone(),
901                    headers: vec![],
902                    timestamp: None,
903                    key: 0,
904                    value: b"some-value-1".to_vec(),
905                    partition: 0,
906                })
907                .await
908                .unwrap();
909
910            // At this point we're going to pretend we've performed a compaction
911            // that would result in the commit log file being split into a .history
912            // file.
913
914            let mut topic_file_op = {
915                let locked_topic_file_ops = task_cl.topic_file_ops.lock().unwrap();
916                locked_topic_file_ops.get(&task_topic).unwrap().clone()
917            };
918            topic_file_op.age_active_file().unwrap();
919
920            task_cl
921                .produce(ProducerRecord {
922                    topic: task_topic.clone(),
923                    headers: vec![],
924                    timestamp: None,
925                    key: 0,
926                    value: b"some-value-2".to_vec(),
927                    partition: 0,
928                })
929                .await
930                .unwrap();
931
932            time::sleep(TOPIC_FILE_PRODUCER_FLUSH * 2).await;
933            let offsets = task_cl.offsets(task_topic.clone(), 0).await.unwrap();
934            assert_eq!(
935                offsets,
936                PartitionOffsets {
937                    beginning_offset: 0,
938                    end_offset: 2
939                }
940            );
941
942            let topic_file_path = logged_dir.join(task_topic.as_str());
943            assert!(topic_file_path.exists());
944            assert!(topic_file_path
945                .with_extension(topic_file_op::HISTORY_FILE_EXTENSION)
946                .exists());
947
948            task_cl.close_topic(&task_topic);
949        });
950
951        let offsets = vec![ConsumerOffset {
952            topic: topic.clone(),
953            partition: 0,
954            offset: 1,
955        }];
956        let subscriptions = vec![Subscription {
957            topic: topic.clone(),
958        }];
959        let mut records = cl.scoped_subscribe("some-consumer", offsets, subscriptions, None);
960
961        assert_eq!(
962            records.next().await,
963            Some(ConsumerRecord {
964                topic,
965                headers: vec![],
966                timestamp: None,
967                key: 0,
968                value: b"some-value-2".to_vec(),
969                partition: 0,
970                offset: 2
971            })
972        );
973    }
974
975    #[test(tokio::test)]
976    async fn test_consume_wait_for_append() {
977        let logged_dir = env::temp_dir().join("test_consume_wait_for_append");
978        let _ = fs::remove_dir_all(&logged_dir).await;
979        let _ = fs::create_dir_all(&logged_dir).await;
980        println!("Writing to {}", logged_dir.to_string_lossy());
981
982        let cl = FileLog::new(logged_dir);
983        let task_cl = cl.clone();
984
985        let topic = Topic::from("my-topic");
986
987        let subscribing = Arc::new(Notify::new());
988        let task_subscribing = subscribing.clone();
989
990        let produced = Arc::new(Notify::new());
991        let task_produced = produced.clone();
992
993        let task_topic = topic.clone();
994        tokio::spawn(async move {
995            let subscriptions = vec![Subscription { topic: task_topic }];
996            let mut records =
997                task_cl.scoped_subscribe("some-consumer", vec![], subscriptions, None);
998            task_subscribing.notify_one();
999
1000            while records.next().await.is_some() {
1001                task_produced.notify_one();
1002            }
1003        });
1004
1005        subscribing.notified().await;
1006        time::sleep(TOPIC_FILE_CONSUMER_POLL + Duration::from_millis(500)).await;
1007
1008        cl.produce(ProducerRecord {
1009            topic: topic.clone(),
1010            headers: vec![],
1011            timestamp: None,
1012            key: 0,
1013            value: b"some-value-0".to_vec(),
1014            partition: 0,
1015        })
1016        .await
1017        .unwrap();
1018
1019        produced.notified().await;
1020    }
1021
1022    #[test(tokio::test)]
1023    async fn test_consume_with_idle() {
1024        let logged_dir = env::temp_dir().join("test_consume_with_idle");
1025        let _ = fs::remove_dir_all(&logged_dir).await;
1026        let _ = fs::create_dir_all(&logged_dir).await;
1027
1028        let cl = FileLog::new(logged_dir);
1029
1030        let topic = Topic::from("my-topic");
1031
1032        let offsets = vec![ConsumerOffset {
1033            topic: topic.clone(),
1034            partition: 0,
1035            offset: 1,
1036        }];
1037        let subscriptions = vec![Subscription {
1038            topic: topic.clone(),
1039        }];
1040        let mut records = cl.scoped_subscribe(
1041            "some-consumer",
1042            offsets,
1043            subscriptions,
1044            Some(Duration::from_millis(100)),
1045        );
1046        assert!(records.next().await.is_none());
1047
1048        cl.produce(ProducerRecord {
1049            topic: topic.clone(),
1050            headers: vec![],
1051            timestamp: None,
1052            key: 0,
1053            value: b"some-value-0".to_vec(),
1054            partition: 0,
1055        })
1056        .await
1057        .unwrap();
1058
1059        let subscriptions = vec![Subscription { topic }];
1060        let mut records = cl.scoped_subscribe(
1061            "some-consumer",
1062            vec![],
1063            subscriptions,
1064            Some(TOPIC_FILE_CONSUMER_POLL + Duration::from_millis(500)),
1065        );
1066        assert!(records.next().await.is_some());
1067        assert!(records.next().await.is_none());
1068    }
1069
1070    #[test(tokio::test)]
1071    async fn test_recovery() {
1072        let logged_dir = env::temp_dir().join("test_recovery");
1073        let _ = fs::remove_dir_all(&logged_dir).await;
1074        let _ = fs::create_dir_all(&logged_dir).await;
1075        println!("Writing to {logged_dir:?}");
1076
1077        let cl = FileLog::new(logged_dir.clone());
1078
1079        let topic = Topic::from("my-topic");
1080
1081        cl.produce(ProducerRecord {
1082            topic: topic.clone(),
1083            headers: vec![],
1084            timestamp: None,
1085            key: 0,
1086            value: b"some-value-0".to_vec(),
1087            partition: 0,
1088        })
1089        .await
1090        .unwrap();
1091        cl.produce(ProducerRecord {
1092            topic: topic.clone(),
1093            headers: vec![],
1094            timestamp: None,
1095            key: 0,
1096            value: b"some-value-1".to_vec(),
1097            partition: 0,
1098        })
1099        .await
1100        .unwrap();
1101        cl.produce(ProducerRecord {
1102            topic: topic.clone(),
1103            headers: vec![],
1104            timestamp: None,
1105            key: 0,
1106            value: b"some-value-2".to_vec(),
1107            partition: 0,
1108        })
1109        .await
1110        .unwrap();
1111
1112        // Ensure everything gets flushed out and left in a good state.
1113        drop(cl);
1114
1115        // Now corrupt the log by knocking a few bytes off the end
1116
1117        let topic_file_path = logged_dir.join(topic.as_str());
1118        let topic_file = fs::OpenOptions::new()
1119            .write(true)
1120            .open(topic_file_path)
1121            .await
1122            .unwrap();
1123
1124        let len = topic_file.metadata().await.unwrap().len();
1125        topic_file.set_len(len - 2).await.unwrap();
1126
1127        // Start producing to a new log - simulates a restart.
1128
1129        let cl = FileLog::new(logged_dir.clone());
1130
1131        cl.produce(ProducerRecord {
1132            topic: topic.clone(),
1133            headers: vec![],
1134            timestamp: None,
1135            key: 0,
1136            value: b"some-value-3".to_vec(),
1137            partition: 0,
1138        })
1139        .await
1140        .unwrap();
1141
1142        let offsets = vec![ConsumerOffset {
1143            topic: topic.clone(),
1144            partition: 0,
1145            offset: 0,
1146        }];
1147        let subscriptions = vec![Subscription {
1148            topic: topic.clone(),
1149        }];
1150        let mut records = cl.scoped_subscribe("some-consumer", offsets, subscriptions, None);
1151
1152        assert_eq!(
1153            records.next().await,
1154            Some(ConsumerRecord {
1155                topic: topic.clone(),
1156                headers: vec![],
1157                timestamp: None,
1158                key: 0,
1159                value: b"some-value-1".to_vec(),
1160                partition: 0,
1161                offset: 1
1162            })
1163        );
1164
1165        assert_eq!(
1166            records.next().await,
1167            Some(ConsumerRecord {
1168                topic,
1169                headers: vec![],
1170                timestamp: None,
1171                key: 0,
1172                value: b"some-value-3".to_vec(),
1173                partition: 0,
1174                offset: 2
1175            })
1176        );
1177    }
1178}