streambed_logged/
lib.rs

1#![doc = include_str!("../README.md")]
2
3pub mod args;
4pub mod compaction;
5mod topic_file_op;
6
7use async_stream::stream;
8use async_trait::async_trait;
9use bytes::BufMut;
10use bytes::{Buf, BytesMut};
11use chrono::{DateTime, Utc};
12use compaction::{CompactionStrategy, Compactor, ScopedTopicSubscriber, TopicStorageOps};
13use crc::{Crc, CRC_32_ISCSI};
14use log::{error, trace, warn};
15use serde::{Deserialize, Serialize};
16use serde_with::{serde_as, TimestampSecondsWithFrac};
17use std::fs::{self, File, OpenOptions};
18use std::io::{self, Read, Write};
19use std::slice;
20use std::sync::{Arc, Mutex};
21use std::{
22    collections::{hash_map::Entry, HashMap, VecDeque},
23    path::{Path, PathBuf},
24    pin::Pin,
25    time::Duration,
26};
27use streambed::commit_log::{
28    CommitLog, ConsumerOffset, ConsumerRecord, Header, HeaderKey, Offset, PartitionOffsets,
29    ProducedOffset, ProducerError, ProducerRecord, Subscription, Topic,
30};
31use streambed::commit_log::{Key, Partition};
32use tokio::{
33    sync::{mpsc, oneshot},
34    time,
35};
36use tokio_stream::Stream;
37use tokio_util::codec::Decoder;
38use topic_file_op::TopicFileOp;
39
40use crate::topic_file_op::TopicFileOpError;
41
42const COMPACTOR_QUEUE_SIZE: usize = 10;
43const COMPACTOR_WRITE_POLL: Duration = Duration::from_millis(10);
44const CONSUMER_QUEUE_SIZE: usize = 10;
45static CRC: Crc<u32> = Crc::<u32>::new(&CRC_32_ISCSI);
46
47const PRODUCER_QUEUE_SIZE: usize = 10;
48const TOPIC_FILE_CONSUMER_POLL: Duration = Duration::from_secs(1);
49const TOPIC_FILE_PRODUCER_FLUSH: Duration = Duration::from_millis(10);
50
51type ProduceReply = Result<ProducedOffset, ProducerError>;
52type ProduceRequest = (ProducerRecord, oneshot::Sender<ProduceReply>);
53type ShareableTopicMap<T> = Arc<Mutex<HashMap<Topic, T>>>;
54
55/// A commit log implementation that uses the file system as its
56/// backing store.
57///
58/// Considerations:
59///
60/// 1. Partition values cannot be non-zero.
61/// 2. The number of subscriptions of a topic will translate to
62///    the number of tasks that are spawned, along with their
63///    associated resources.
64/// 3. Only one process can produce to a specific topic. There
65///    is no process-wide locking considered. Multiple processes
66///    can read a topic though.
67#[derive(Clone)]
68pub struct FileLog {
69    compactor_txs: ShareableTopicMap<mpsc::Sender<u64>>,
70    compaction_threshold_size: u64,
71    compaction_write_buffer_size: usize,
72    max_record_size: usize,
73    read_buffer_size: usize,
74    producer_txs: ShareableTopicMap<mpsc::Sender<ProduceRequest>>,
75    pub(crate) topic_file_ops: ShareableTopicMap<TopicFileOp>,
76    root_path: PathBuf,
77    write_buffer_size: usize,
78}
79
80#[derive(Clone, Deserialize, Debug, Eq, PartialEq, Serialize)]
81pub struct StorableHeader {
82    key: HeaderKey,
83    value: Vec<u8>,
84}
85
86#[serde_as]
87#[derive(Clone, Deserialize, Debug, Eq, PartialEq, Serialize)]
88struct StorableRecord {
89    version: u32,
90    headers: Vec<StorableHeader>,
91    #[serde_as(as = "Option<TimestampSecondsWithFrac>")]
92    timestamp: Option<DateTime<Utc>>,
93    key: u64,
94    value: Vec<u8>,
95    offset: u64,
96}
97
98/// Some unrecoverable issue when attempting to register compaction.
99#[derive(Debug)]
100pub struct CompactionRegistrationError;
101
102impl FileLog {
103    /// Construct a new file log that will also spawn a task for each
104    /// topic being produced.
105    pub fn new<P>(root_path: P) -> Self
106    where
107        P: Into<PathBuf>,
108    {
109        Self::with_config(root_path, 64 * 1024, 8192, 64 * 1024, 8 * 1024, 16 * 1024)
110    }
111
112    /// Construct a new file log that will also spawn a task for each
113    /// topic being produced. The compaction_threshold_size is the size of the
114    /// active file that the compactor looks at before deciding to perform a
115    /// compaction (in bytes). This typically equates to the blocksize on disk
116    /// i.e. 64KB for flash based storage. 64KB is still small enough that scans
117    /// over a topic are relatively fast, working on the principle of having roughly
118    /// 2,000 records. We also require a read and write buffer sizes to reduce
119    /// system calls. When writing, either the buffer reaches capacity or a
120    /// flush of the buffer occurs in the absence of another write to perform.
121    pub fn with_config<P>(
122        root_path: P,
123        compaction_threshold_size: u64,
124        read_buffer_size: usize,
125        compaction_write_buffer_size: usize,
126        write_buffer_size: usize,
127        max_record_size: usize,
128    ) -> Self
129    where
130        P: Into<PathBuf>,
131    {
132        Self {
133            compactor_txs: Arc::new(Mutex::new(HashMap::new())),
134            compaction_threshold_size,
135            compaction_write_buffer_size,
136            max_record_size,
137            read_buffer_size,
138            root_path: root_path.into(),
139            producer_txs: Arc::new(Mutex::new(HashMap::new())),
140            topic_file_ops: Arc::new(Mutex::new(HashMap::new())),
141            write_buffer_size,
142        }
143    }
144
145    /// Frees resources associated with a topic, but not any associated compaction.
146    /// Invoking the method is benign in that if consuming or producing occurs
147    /// on this post closing, resources will be re-established.
148    pub fn close_topic(&mut self, topic: &Topic) {
149        if let Ok(mut locked_producer_txs) = self.producer_txs.lock() {
150            locked_producer_txs.remove(topic);
151        }
152        if let Ok(mut locked_topic_file_ops) = self.topic_file_ops.lock() {
153            locked_topic_file_ops.remove(topic);
154        }
155    }
156
157    /// Register compaction for a given topic. Any previously registered compaction
158    /// is replaced. A new task for compaction will be created in the background.
159    ///
160    /// Compaction's memory can be controlled somewhat through `compaction_write_buffer_size`
161    /// when creating this file commit log. This buffer size is selected to minimize
162    /// writing to flash and will be allocated once per topic compaction registered here.
163    pub async fn register_compaction<CS>(
164        &mut self,
165        topic: Topic,
166        compaction_strategy: CS,
167    ) -> Result<(), CompactionRegistrationError>
168    where
169        CS: CompactionStrategy + Send + Sync + 'static,
170    {
171        let topic_file_op = {
172            let Ok(mut locked_topic_file_ops) = self.topic_file_ops.lock() else {
173                return Err(CompactionRegistrationError);
174            };
175            acquire_topic_file_ops(&self.root_path, &topic, &mut locked_topic_file_ops)
176        };
177
178        let mut age_active_file_topic_file_op = topic_file_op.clone();
179        let age_active_file_read_buffer_size = self.read_buffer_size;
180        let age_active_file_max_record_size = self.max_record_size;
181        let new_work_file_topic_file_op = topic_file_op.clone();
182        let recover_history_files_topic_file_op = topic_file_op.clone();
183        let replace_history_files_topic_file_op = topic_file_op;
184
185        let compaction_write_buffer_size = self.compaction_write_buffer_size;
186
187        let mut compactor = Compactor::new(
188            compaction_strategy,
189            self.compaction_threshold_size,
190            ScopedTopicSubscriber::new(self.clone(), topic.clone()),
191            TopicStorageOps::new(
192                move || {
193                    age_active_file_topic_file_op.age_active_file()?;
194                    find_offset(
195                        &age_active_file_topic_file_op,
196                        age_active_file_read_buffer_size,
197                        age_active_file_max_record_size,
198                        true,
199                    )
200                    .map(|o| o.map(|o| o.end_offset))
201                    .map_err(TopicFileOpError::IoError)
202                },
203                move || new_work_file_topic_file_op.new_work_file(compaction_write_buffer_size),
204                move || recover_history_files_topic_file_op.recover_history_files(),
205                move || replace_history_files_topic_file_op.replace_history_files(),
206            ),
207        );
208
209        let (compactor_tx, mut compactor_rx) = mpsc::channel::<u64>(COMPACTOR_QUEUE_SIZE);
210
211        tokio::spawn(async move {
212            let mut recv = compactor_rx.recv().await;
213            while let Some(active_file_size) = recv {
214                compactor.step(active_file_size).await;
215                if compactor.is_idle() {
216                    recv = compactor_rx.recv().await;
217                } else if let Ok(r) = time::timeout(COMPACTOR_WRITE_POLL, compactor_rx.recv()).await
218                {
219                    recv = r;
220                }
221            }
222        });
223
224        if let Ok(mut compactors) = self.compactor_txs.lock() {
225            compactors.insert(topic, compactor_tx);
226        }
227
228        Ok(())
229    }
230
231    /// Unregister compaction for a given topic
232    pub fn unregister_compaction(&mut self, topic: &Topic) {
233        if let Ok(mut compactors) = self.compactor_txs.lock() {
234            compactors.remove(topic);
235        }
236    }
237}
238
239#[async_trait]
240impl CommitLog for FileLog {
241    async fn offsets(&self, topic: Topic, _partition: Partition) -> Option<PartitionOffsets> {
242        let Ok(mut locked_topic_file_ops) = self.topic_file_ops.lock() else {
243            return None;
244        };
245        let topic_file_op =
246            acquire_topic_file_ops(&self.root_path, &topic, &mut locked_topic_file_ops);
247        drop(locked_topic_file_ops);
248
249        find_offset(
250            &topic_file_op,
251            self.read_buffer_size,
252            self.max_record_size,
253            false,
254        )
255        .ok()
256        .flatten()
257    }
258
259    async fn produce(&self, record: ProducerRecord) -> ProduceReply {
260        let topic_producer = {
261            let Ok(mut locked_producer_map) = self.producer_txs.lock() else {
262                return Err(ProducerError::CannotProduce);
263            };
264            if let Some(topic_producer) = locked_producer_map.get(&record.topic) {
265                let producer_tx = topic_producer.clone();
266                drop(locked_producer_map);
267                producer_tx
268            } else {
269                let (producer_tx, mut producer_rx) =
270                    mpsc::channel::<ProduceRequest>(PRODUCER_QUEUE_SIZE);
271                locked_producer_map.insert(record.topic.clone(), producer_tx.clone());
272                drop(locked_producer_map); // drop early so we don't double-lock with the next thing
273
274                let Ok(mut locked_topic_file_ops) = self.topic_file_ops.lock() else {
275                    return Err(ProducerError::CannotProduce);
276                };
277                let mut topic_file_op = acquire_topic_file_ops(
278                    &self.root_path,
279                    &record.topic,
280                    &mut locked_topic_file_ops,
281                );
282                drop(locked_topic_file_ops);
283
284                let found_offsets = match find_offset(
285                    &topic_file_op,
286                    self.read_buffer_size,
287                    self.max_record_size,
288                    false,
289                ) {
290                    r @ Ok(_) => r,
291                    Err(e) => {
292                        error!("Error {e} when producing. Attempting to recover by truncating the active file.");
293
294                        if let Err(e) = recover_active_file(
295                            &mut topic_file_op,
296                            self.read_buffer_size,
297                            self.max_record_size,
298                        ) {
299                            error!("Error {e} when recoverying. Unable to recover the active file.")
300                        }
301
302                        find_offset(
303                            &topic_file_op,
304                            self.read_buffer_size,
305                            self.max_record_size,
306                            false,
307                        )
308                    }
309                };
310                let mut next_offset = found_offsets
311                    .map(|offsets| offsets.map_or(0, |offsets| offsets.end_offset.wrapping_add(1)));
312
313                let task_root_path = self.root_path.clone();
314                let task_compactor_txs = self.compactor_txs.clone();
315                let task_topic_file_ops = self.topic_file_ops.clone();
316                let task_write_buffer_size = self.write_buffer_size;
317
318                let mut open_options = fs::OpenOptions::new();
319                open_options.append(true).create(true);
320
321                let mut file_size = topic_file_op
322                    .active_file_size(&open_options, task_write_buffer_size)
323                    .unwrap_or_default();
324
325                tokio::spawn({
326                    async move {
327                        let mut recv = producer_rx.recv().await;
328                        while let Some((record, reply_to)) = recv {
329                            if let Ok(next_offset) = &mut next_offset {
330                                let topic_file_op = {
331                                    if let Ok(mut locked_topic_file_ops) =
332                                        task_topic_file_ops.lock()
333                                    {
334                                        Some(acquire_topic_file_ops(
335                                            &task_root_path,
336                                            &record.topic,
337                                            &mut locked_topic_file_ops,
338                                        ))
339                                    } else {
340                                        None
341                                    }
342                                };
343                                if let Some(mut topic_file_op) = topic_file_op {
344                                    let r = topic_file_op.with_active_file(
345                                        &open_options,
346                                        task_write_buffer_size,
347                                        |file| {
348                                            let storable_record = StorableRecord {
349                                                version: 0,
350                                                headers: record
351                                                    .headers
352                                                    .into_iter()
353                                                    .map(|h| StorableHeader {
354                                                        key: h.key,
355                                                        value: h.value,
356                                                    })
357                                                    .collect(),
358                                                timestamp: record.timestamp,
359                                                key: record.key,
360                                                value: record.value,
361                                                offset: *next_offset,
362                                            };
363
364                                            trace!("Producing record: {:?}", storable_record);
365
366                                            if let Ok(buf) = postcard::to_stdvec_crc32(
367                                                &storable_record,
368                                                CRC.digest(),
369                                            ) {
370                                                file.write_all(&buf)
371                                                    .map_err(TopicFileOpError::IoError)
372                                                    .map(|_| buf.len())
373                                            } else {
374                                                Err(TopicFileOpError::CannotSerialize)
375                                            }
376                                        },
377                                    );
378
379                                    if let Ok((bytes_written, is_new_active_file)) = r {
380                                        let _ = reply_to.send(Ok(ProducedOffset {
381                                            offset: *next_offset,
382                                        }));
383
384                                        *next_offset = next_offset.wrapping_add(1);
385
386                                        if is_new_active_file {
387                                            file_size = 0;
388                                        }
389                                        file_size = file_size.wrapping_add(bytes_written as u64);
390
391                                        let compactor_tx = {
392                                            if let Ok(locked_task_compactor_txs) =
393                                                task_compactor_txs.lock()
394                                            {
395                                                locked_task_compactor_txs
396                                                    .get(&record.topic)
397                                                    .cloned()
398                                            } else {
399                                                None
400                                            }
401                                        };
402                                        if let Some(compactor_tx) = compactor_tx {
403                                            let _ = compactor_tx.send(file_size).await;
404                                        }
405
406                                        match time::timeout(
407                                            TOPIC_FILE_PRODUCER_FLUSH,
408                                            producer_rx.recv(),
409                                        )
410                                        .await
411                                        {
412                                            Ok(r) => recv = r,
413                                            Err(_) => {
414                                                let _ = topic_file_op.flush_active_file();
415                                                recv = producer_rx.recv().await;
416                                            }
417                                        }
418
419                                        continue;
420                                    }
421                                }
422                            }
423
424                            let _ = reply_to.send(Err(ProducerError::CannotProduce));
425                            recv = producer_rx.recv().await;
426                        }
427                    }
428                });
429
430                producer_tx
431            }
432        };
433
434        let (reply_tx, reply_rx) = oneshot::channel();
435        if topic_producer.send((record, reply_tx)).await.is_ok() {
436            if let Ok(reply) = reply_rx.await {
437                reply
438            } else {
439                Err(ProducerError::CannotProduce)
440            }
441        } else {
442            Err(ProducerError::CannotProduce)
443        }
444    }
445
446    fn scoped_subscribe<'a>(
447        &'a self,
448        _consumer_group_name: &str,
449        offsets: Vec<ConsumerOffset>,
450        subscriptions: Vec<Subscription>,
451        idle_timeout: Option<Duration>,
452    ) -> Pin<Box<dyn Stream<Item = ConsumerRecord> + Send + 'a>> {
453        let offsets = offsets
454            .iter()
455            .map(|e| {
456                assert_eq!(e.partition, 0);
457                (e.topic.to_owned(), e.offset)
458            })
459            .collect::<HashMap<Topic, u64>>();
460
461        let (tx, mut rx) = mpsc::channel(CONSUMER_QUEUE_SIZE);
462
463        let mut open_options = OpenOptions::new();
464        open_options.read(true);
465
466        for s in subscriptions {
467            let task_root_path = self.root_path.clone();
468            let task_topic = s.topic.clone();
469            let mut task_offset = offsets.get(&s.topic).copied();
470            let task_tx = tx.clone();
471            let task_read_buffer_size = self.read_buffer_size;
472            let task_max_record_size = self.max_record_size;
473            let task_topic_file_ops = self.topic_file_ops.clone();
474            let task_open_options = open_options.clone();
475            tokio::spawn(async move {
476                let mut buf = BytesMut::with_capacity(task_read_buffer_size);
477                let mut decoder = StorableRecordDecoder::new(task_max_record_size);
478                let mut last_modification_time = None;
479                'outer: loop {
480                    buf.clear();
481
482                    let topic_file_op = {
483                        let Ok(mut locked_topic_file_ops) = task_topic_file_ops.lock() else {
484                            break;
485                        };
486                        let topic_file_op = acquire_topic_file_ops(
487                            &task_root_path,
488                            &task_topic,
489                            &mut locked_topic_file_ops,
490                        );
491                        drop(locked_topic_file_ops);
492                        topic_file_op
493                    };
494
495                    let modification_time = topic_file_op.modification_time();
496                    if modification_time <= last_modification_time {
497                        if task_tx.is_closed() {
498                            break;
499                        }
500                        time::sleep(TOPIC_FILE_CONSUMER_POLL).await;
501                        continue;
502                    } else {
503                        last_modification_time = modification_time;
504                    }
505
506                    let mut topic_files = topic_file_op
507                        .open_files(task_open_options.clone(), false)
508                        .into_iter();
509                    match topic_files.next() {
510                        Some(Ok(mut topic_file)) => loop {
511                            let Ok(len) = read_buf(&mut topic_file, &mut buf) else {
512                                break;
513                            };
514
515                            let decode_fn = if len == 0 {
516                                StorableRecordDecoder::decode_eof
517                            } else {
518                                StorableRecordDecoder::decode
519                            };
520                            let mut r = decode_fn(&mut decoder, &mut buf);
521                            while let Ok(Some(record)) = r {
522                                if task_offset.is_none() || Some(record.offset) > task_offset {
523                                    let consumer_record = ConsumerRecord {
524                                        topic: task_topic.clone(),
525                                        headers: record
526                                            .headers
527                                            .into_iter()
528                                            .map(|h| Header {
529                                                key: h.key,
530                                                value: h.value,
531                                            })
532                                            .collect(),
533                                        timestamp: record.timestamp,
534                                        key: record.key,
535                                        value: record.value,
536                                        partition: 0,
537                                        offset: record.offset,
538                                    };
539
540                                    trace!("Consumed record: {:?}", consumer_record);
541
542                                    if task_tx.send(consumer_record).await.is_err() {
543                                        break 'outer;
544                                    }
545
546                                    task_offset = Some(record.offset)
547                                }
548
549                                r = decode_fn(&mut decoder, &mut buf);
550                            }
551                            match r {
552                                Ok(Some(_)) => (), // Should never happen
553                                Ok(None) if len == 0 => match topic_files.next() {
554                                    Some(Ok(tf)) => topic_file = tf,
555                                    Some(Err(e)) => {
556                                        warn!("Error consuming topic file: {e} - aborting subscription for {task_topic}");
557                                        break 'outer;
558                                    }
559                                    None => {
560                                        if task_tx.is_closed() {
561                                            break 'outer;
562                                        }
563                                        time::sleep(TOPIC_FILE_CONSUMER_POLL).await;
564                                        continue 'outer;
565                                    }
566                                },
567                                Ok(None) => (),
568                                Err(e) => {
569                                    if task_tx.is_closed() {
570                                        break 'outer;
571                                    }
572                                    trace!("Topic is corrupt for {topic_file:?}. Error {e} occurred when subscribed. Retrying.");
573                                    time::sleep(TOPIC_FILE_CONSUMER_POLL).await;
574                                    continue 'outer;
575                                }
576                            }
577                        },
578                        Some(Err(e)) if e.kind() == std::io::ErrorKind::NotFound => {
579                            if task_tx.is_closed() {
580                                break;
581                            }
582                            time::sleep(TOPIC_FILE_CONSUMER_POLL).await
583                        }
584                        Some(Err(e)) => {
585                            warn!("Error reading topic file: {e} - aborting subscription");
586                        }
587                        None => {
588                            if task_tx.is_closed() {
589                                break;
590                            }
591                            time::sleep(TOPIC_FILE_CONSUMER_POLL).await
592                        }
593                    }
594                }
595            });
596        }
597
598        Box::pin(stream!({
599            if let Some(it) = idle_timeout {
600                while let Some(record) = time::timeout(it, rx.recv()).await.ok().flatten() {
601                    yield record;
602                }
603            } else {
604                while let Some(record) = rx.recv().await {
605                    yield record;
606                }
607            }
608            trace!("Ending subscriptions");
609        }))
610    }
611}
612
613fn acquire_topic_file_ops(
614    root_path: &Path,
615    topic: &Topic,
616    topic_file_ops: &mut HashMap<Topic, TopicFileOp>,
617) -> TopicFileOp {
618    if let Some(topic_file_op) = topic_file_ops.get(topic) {
619        topic_file_op.clone()
620    } else {
621        let topic = topic.clone();
622        let topic_file_op = TopicFileOp::new(root_path.to_path_buf(), topic.clone());
623        topic_file_ops.insert(topic, topic_file_op.clone());
624        topic_file_op
625    }
626}
627
628fn find_offset(
629    topic_file_op: &TopicFileOp,
630    read_buffer_size: usize,
631    max_record_size: usize,
632    exclude_active_file: bool,
633) -> io::Result<Option<PartitionOffsets>> {
634    let mut open_options = OpenOptions::new();
635    open_options.read(true);
636    let mut topic_files = topic_file_op
637        .open_files(open_options, exclude_active_file)
638        .into_iter();
639    match topic_files.next() {
640        Some(Ok(mut topic_file)) => {
641            let mut buf = BytesMut::with_capacity(read_buffer_size);
642            let mut decoder = StorableRecordDecoder::new(max_record_size);
643            let mut beginning_offset = None;
644            let mut end_offset = None;
645            loop {
646                let Ok(len) = read_buf(&mut topic_file, &mut buf) else {
647                    break;
648                };
649
650                let decode_fn = if len == 0 {
651                    StorableRecordDecoder::decode_eof
652                } else {
653                    StorableRecordDecoder::decode
654                };
655                while let Some(record) = decode_fn(&mut decoder, &mut buf)? {
656                    if beginning_offset.is_none() {
657                        beginning_offset = Some(record.offset);
658                        end_offset = Some(record.offset);
659                    } else {
660                        end_offset = Some(record.offset);
661                    }
662                }
663                if len == 0 {
664                    match topic_files.next() {
665                        Some(Ok(tf)) => topic_file = tf,
666                        Some(Err(e)) => return Err(e),
667                        None => break,
668                    }
669                }
670            }
671            Ok(Some(PartitionOffsets {
672                beginning_offset: beginning_offset.unwrap_or(0),
673                end_offset: end_offset.unwrap_or(0),
674            }))
675        }
676        Some(Err(e)) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
677        Some(Err(e)) => Err(e),
678        None => Ok(None),
679    }
680}
681
682fn recover_active_file(
683    topic_file_op: &mut TopicFileOp,
684    read_buffer_size: usize,
685    max_record_size: usize,
686) -> Result<(), TopicFileOpError> {
687    let mut open_options = OpenOptions::new();
688    open_options.read(true).write(true);
689    let mut topic_file = topic_file_op.open_active_file(open_options)?;
690    let mut buf = BytesMut::with_capacity(read_buffer_size);
691    let mut decoder = StorableRecordDecoder::new(max_record_size);
692    let mut bytes_read = None;
693    loop {
694        let Ok(len) = read_buf(&mut topic_file, &mut buf) else {
695            break;
696        };
697
698        let before_decode_len = buf.len();
699
700        let decode_fn = if len == 0 {
701            StorableRecordDecoder::decode_eof
702        } else {
703            StorableRecordDecoder::decode
704        };
705        let mut r = decode_fn(&mut decoder, &mut buf);
706        while let Ok(Some(_)) = r {
707            r = decode_fn(&mut decoder, &mut buf);
708        }
709        match r {
710            Ok(None) if len == 0 => break,
711            Ok(_) => (),
712            Err(_) => {
713                if let Some(bytes_read) = bytes_read {
714                    topic_file
715                        .set_len(bytes_read)
716                        .map_err(TopicFileOpError::IoError)?;
717                }
718                break;
719            }
720        }
721
722        let consumed_bytes = (before_decode_len - buf.len()) as u64;
723        bytes_read =
724            bytes_read.map_or_else(|| Some(consumed_bytes), |br| br.checked_add(consumed_bytes));
725    }
726    Ok(())
727}
728
729// Similar to Tokio's AsyncReadExt [`read_buf`](https://docs.rs/tokio/latest/tokio/io/trait.AsyncReadExt.html#method.read_buf).
730// Thanks to Alice Ryhl: https://discord.com/channels/500028886025895936/627696030334582784/1071037851980021761
731fn read_buf<B>(file: &mut File, buf: &mut B) -> io::Result<usize>
732where
733    B: BufMut,
734{
735    let chunk = buf.chunk_mut();
736    let len = chunk.len();
737    let ptr = chunk.as_mut_ptr();
738    let unused_buf = unsafe { slice::from_raw_parts_mut(ptr, len) };
739    let result = file.read(unused_buf);
740    if let Ok(len) = result {
741        unsafe {
742            buf.advance_mut(len);
743        }
744    }
745    result
746}
747
748struct StorableRecordDecoder {
749    max_record_size: usize,
750}
751
752impl StorableRecordDecoder {
753    pub fn new(max_record_size: usize) -> Self {
754        Self { max_record_size }
755    }
756}
757
758impl Decoder for StorableRecordDecoder {
759    type Item = StorableRecord;
760
761    type Error = std::io::Error;
762
763    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
764        let result = postcard::take_from_bytes_crc32::<StorableRecord>(src, CRC.digest());
765        match result {
766            Ok((record, remaining)) => {
767                src.advance(src.len() - remaining.len());
768                Ok(Some(record))
769            }
770            Err(e)
771                if e == postcard::Error::DeserializeUnexpectedEnd
772                    && src.len() <= self.max_record_size =>
773            {
774                Ok(None)
775            }
776            Err(e) => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
777        }
778    }
779}
780
781#[cfg(test)]
782mod tests {
783    use std::{env, sync::Arc};
784
785    use test_log::test;
786    use tokio::{fs, sync::Notify};
787    use tokio_stream::StreamExt;
788
789    use super::*;
790
791    #[test(tokio::test)]
792    async fn test_produce_consume() {
793        let logged_dir = env::temp_dir().join("test_produce_consume");
794        // Deliberately converting to a String to test Into<PathBuf>
795        let logged_dir = logged_dir.to_string_lossy().to_string();
796        let _ = fs::remove_dir_all(&logged_dir).await;
797        let _ = fs::create_dir_all(&logged_dir).await;
798        println!("Writing to {logged_dir}");
799
800        let cl = FileLog::new(logged_dir);
801        let task_cl = cl.clone();
802
803        let topic = Topic::from("my-topic");
804
805        assert!(cl.offsets(topic.clone(), 0).await.is_none());
806
807        let task_topic = topic.clone();
808        tokio::spawn(async move {
809            task_cl
810                .produce(ProducerRecord {
811                    topic: task_topic.clone(),
812                    headers: vec![],
813                    timestamp: None,
814                    key: 0,
815                    value: b"some-value-0".to_vec(),
816                    partition: 0,
817                })
818                .await
819                .unwrap();
820            task_cl
821                .produce(ProducerRecord {
822                    topic: task_topic.clone(),
823                    headers: vec![],
824                    timestamp: None,
825                    key: 0,
826                    value: b"some-value-1".to_vec(),
827                    partition: 0,
828                })
829                .await
830                .unwrap();
831            task_cl
832                .produce(ProducerRecord {
833                    topic: task_topic.clone(),
834                    headers: vec![],
835                    timestamp: None,
836                    key: 0,
837                    value: b"some-value-2".to_vec(),
838                    partition: 0,
839                })
840                .await
841                .unwrap();
842
843            time::sleep(TOPIC_FILE_PRODUCER_FLUSH * 2).await;
844            let offsets = task_cl.offsets(task_topic, 0).await.unwrap();
845            assert_eq!(
846                offsets,
847                PartitionOffsets {
848                    beginning_offset: 0,
849                    end_offset: 2
850                }
851            );
852        });
853
854        let offsets = vec![ConsumerOffset {
855            topic: topic.clone(),
856            partition: 0,
857            offset: 1,
858        }];
859        let subscriptions = vec![Subscription {
860            topic: topic.clone(),
861        }];
862        let mut records = cl.scoped_subscribe("some-consumer", offsets, subscriptions, None);
863
864        assert_eq!(
865            records.next().await,
866            Some(ConsumerRecord {
867                topic,
868                headers: vec![],
869                timestamp: None,
870                key: 0,
871                value: b"some-value-2".to_vec(),
872                partition: 0,
873                offset: 2
874            })
875        );
876    }
877
878    #[test(tokio::test)]
879    async fn test_produce_consume_with_split() {
880        let logged_dir = env::temp_dir().join("test_produce_consume_with_split");
881        // Deliberately converting to a String to test Into<PathBuf>
882        let _ = fs::remove_dir_all(&logged_dir).await;
883        let _ = fs::create_dir_all(&logged_dir).await;
884        println!("Writing to {}", logged_dir.to_string_lossy());
885
886        let mut cl = FileLog::new(logged_dir.clone());
887        let mut task_cl = cl.clone();
888
889        let topic = Topic::from("my-topic");
890
891        cl.register_compaction(topic.clone(), compaction::KeyBasedRetention::new(1))
892            .await
893            .unwrap();
894
895        assert!(cl.offsets(topic.clone(), 0).await.is_none());
896
897        let task_topic = topic.clone();
898        tokio::spawn(async move {
899            task_cl
900                .produce(ProducerRecord {
901                    topic: task_topic.clone(),
902                    headers: vec![],
903                    timestamp: None,
904                    key: 0,
905                    value: b"some-value-0".to_vec(),
906                    partition: 0,
907                })
908                .await
909                .unwrap();
910            task_cl
911                .produce(ProducerRecord {
912                    topic: task_topic.clone(),
913                    headers: vec![],
914                    timestamp: None,
915                    key: 0,
916                    value: b"some-value-1".to_vec(),
917                    partition: 0,
918                })
919                .await
920                .unwrap();
921
922            // At this point we're going to pretend we've performed a compaction
923            // that would result in the commit log file being split into a .history
924            // file.
925
926            let mut topic_file_op = {
927                let locked_topic_file_ops = task_cl.topic_file_ops.lock().unwrap();
928                locked_topic_file_ops.get(&task_topic).unwrap().clone()
929            };
930            topic_file_op.age_active_file().unwrap();
931
932            task_cl
933                .produce(ProducerRecord {
934                    topic: task_topic.clone(),
935                    headers: vec![],
936                    timestamp: None,
937                    key: 0,
938                    value: b"some-value-2".to_vec(),
939                    partition: 0,
940                })
941                .await
942                .unwrap();
943
944            time::sleep(TOPIC_FILE_PRODUCER_FLUSH * 2).await;
945            let offsets = task_cl.offsets(task_topic.clone(), 0).await.unwrap();
946            assert_eq!(
947                offsets,
948                PartitionOffsets {
949                    beginning_offset: 0,
950                    end_offset: 2
951                }
952            );
953
954            let topic_file_path = logged_dir.join(task_topic.as_str());
955            assert!(topic_file_path.exists());
956            assert!(topic_file_path
957                .with_extension(topic_file_op::HISTORY_FILE_EXTENSION)
958                .exists());
959
960            task_cl.close_topic(&task_topic);
961        });
962
963        let offsets = vec![ConsumerOffset {
964            topic: topic.clone(),
965            partition: 0,
966            offset: 1,
967        }];
968        let subscriptions = vec![Subscription {
969            topic: topic.clone(),
970        }];
971        let mut records = cl.scoped_subscribe("some-consumer", offsets, subscriptions, None);
972
973        assert_eq!(
974            records.next().await,
975            Some(ConsumerRecord {
976                topic,
977                headers: vec![],
978                timestamp: None,
979                key: 0,
980                value: b"some-value-2".to_vec(),
981                partition: 0,
982                offset: 2
983            })
984        );
985    }
986
987    #[test(tokio::test)]
988    async fn test_consume_wait_for_append() {
989        let logged_dir = env::temp_dir().join("test_consume_wait_for_append");
990        let _ = fs::remove_dir_all(&logged_dir).await;
991        let _ = fs::create_dir_all(&logged_dir).await;
992        println!("Writing to {}", logged_dir.to_string_lossy());
993
994        let cl = FileLog::new(logged_dir);
995        let task_cl = cl.clone();
996
997        let topic = Topic::from("my-topic");
998
999        let subscribing = Arc::new(Notify::new());
1000        let task_subscribing = subscribing.clone();
1001
1002        let produced = Arc::new(Notify::new());
1003        let task_produced = produced.clone();
1004
1005        let task_topic = topic.clone();
1006        tokio::spawn(async move {
1007            let subscriptions = vec![Subscription { topic: task_topic }];
1008            let mut records =
1009                task_cl.scoped_subscribe("some-consumer", vec![], subscriptions, None);
1010            task_subscribing.notify_one();
1011
1012            while records.next().await.is_some() {
1013                task_produced.notify_one();
1014            }
1015        });
1016
1017        subscribing.notified().await;
1018        time::sleep(TOPIC_FILE_CONSUMER_POLL + Duration::from_millis(500)).await;
1019
1020        cl.produce(ProducerRecord {
1021            topic: topic.clone(),
1022            headers: vec![],
1023            timestamp: None,
1024            key: 0,
1025            value: b"some-value-0".to_vec(),
1026            partition: 0,
1027        })
1028        .await
1029        .unwrap();
1030
1031        produced.notified().await;
1032    }
1033
1034    #[test(tokio::test)]
1035    async fn test_consume_with_idle() {
1036        let logged_dir = env::temp_dir().join("test_consume_with_idle");
1037        let _ = fs::remove_dir_all(&logged_dir).await;
1038        let _ = fs::create_dir_all(&logged_dir).await;
1039
1040        let cl = FileLog::new(logged_dir);
1041
1042        let topic = Topic::from("my-topic");
1043
1044        let offsets = vec![ConsumerOffset {
1045            topic: topic.clone(),
1046            partition: 0,
1047            offset: 1,
1048        }];
1049        let subscriptions = vec![Subscription {
1050            topic: topic.clone(),
1051        }];
1052        let mut records = cl.scoped_subscribe(
1053            "some-consumer",
1054            offsets,
1055            subscriptions,
1056            Some(Duration::from_millis(100)),
1057        );
1058        assert!(records.next().await.is_none());
1059
1060        cl.produce(ProducerRecord {
1061            topic: topic.clone(),
1062            headers: vec![],
1063            timestamp: None,
1064            key: 0,
1065            value: b"some-value-0".to_vec(),
1066            partition: 0,
1067        })
1068        .await
1069        .unwrap();
1070
1071        let subscriptions = vec![Subscription { topic }];
1072        let mut records = cl.scoped_subscribe(
1073            "some-consumer",
1074            vec![],
1075            subscriptions,
1076            Some(TOPIC_FILE_CONSUMER_POLL + Duration::from_millis(500)),
1077        );
1078        assert!(records.next().await.is_some());
1079        assert!(records.next().await.is_none());
1080    }
1081
1082    #[test(tokio::test)]
1083    async fn test_recovery() {
1084        let logged_dir = env::temp_dir().join("test_recovery");
1085        let _ = fs::remove_dir_all(&logged_dir).await;
1086        let _ = fs::create_dir_all(&logged_dir).await;
1087        println!("Writing to {logged_dir:?}");
1088
1089        let cl = FileLog::new(logged_dir.clone());
1090
1091        let topic = Topic::from("my-topic");
1092
1093        cl.produce(ProducerRecord {
1094            topic: topic.clone(),
1095            headers: vec![],
1096            timestamp: None,
1097            key: 0,
1098            value: b"some-value-0".to_vec(),
1099            partition: 0,
1100        })
1101        .await
1102        .unwrap();
1103        cl.produce(ProducerRecord {
1104            topic: topic.clone(),
1105            headers: vec![],
1106            timestamp: None,
1107            key: 0,
1108            value: b"some-value-1".to_vec(),
1109            partition: 0,
1110        })
1111        .await
1112        .unwrap();
1113        cl.produce(ProducerRecord {
1114            topic: topic.clone(),
1115            headers: vec![],
1116            timestamp: None,
1117            key: 0,
1118            value: b"some-value-2".to_vec(),
1119            partition: 0,
1120        })
1121        .await
1122        .unwrap();
1123
1124        // Ensure everything gets flushed out and left in a good state.
1125        drop(cl);
1126
1127        // Now corrupt the log by knocking a few bytes off the end
1128
1129        let topic_file_path = logged_dir.join(topic.as_str());
1130        let topic_file = fs::OpenOptions::new()
1131            .write(true)
1132            .open(topic_file_path)
1133            .await
1134            .unwrap();
1135
1136        let len = topic_file.metadata().await.unwrap().len();
1137        topic_file.set_len(len - 2).await.unwrap();
1138
1139        // Start producing to a new log - simulates a restart.
1140
1141        let cl = FileLog::new(logged_dir.clone());
1142
1143        cl.produce(ProducerRecord {
1144            topic: topic.clone(),
1145            headers: vec![],
1146            timestamp: None,
1147            key: 0,
1148            value: b"some-value-3".to_vec(),
1149            partition: 0,
1150        })
1151        .await
1152        .unwrap();
1153
1154        let offsets = vec![ConsumerOffset {
1155            topic: topic.clone(),
1156            partition: 0,
1157            offset: 0,
1158        }];
1159        let subscriptions = vec![Subscription {
1160            topic: topic.clone(),
1161        }];
1162        let mut records = cl.scoped_subscribe("some-consumer", offsets, subscriptions, None);
1163
1164        assert_eq!(
1165            records.next().await,
1166            Some(ConsumerRecord {
1167                topic: topic.clone(),
1168                headers: vec![],
1169                timestamp: None,
1170                key: 0,
1171                value: b"some-value-1".to_vec(),
1172                partition: 0,
1173                offset: 1
1174            })
1175        );
1176
1177        assert_eq!(
1178            records.next().await,
1179            Some(ConsumerRecord {
1180                topic,
1181                headers: vec![],
1182                timestamp: None,
1183                key: 0,
1184                value: b"some-value-3".to_vec(),
1185                partition: 0,
1186                offset: 2
1187            })
1188        );
1189    }
1190}