value_log/segment/
multi_writer.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use super::writer::Writer;
6use crate::{
7    compression::Compressor,
8    id::{IdGenerator, SegmentId},
9    ValueHandle,
10};
11use std::path::{Path, PathBuf};
12
13/// Segment writer, may write multiple segments
14pub struct MultiWriter<C: Compressor + Clone> {
15    folder: PathBuf,
16    target_size: u64,
17
18    writers: Vec<Writer<C>>,
19
20    id_generator: IdGenerator,
21
22    compression: Option<C>,
23}
24
25impl<C: Compressor + Clone> MultiWriter<C> {
26    /// Initializes a new segment writer.
27    ///
28    /// # Errors
29    ///
30    /// Will return `Err` if an IO error occurs.
31    #[doc(hidden)]
32    pub fn new<P: AsRef<Path>>(
33        id_generator: IdGenerator,
34        target_size: u64,
35        folder: P,
36    ) -> std::io::Result<Self> {
37        let folder = folder.as_ref();
38
39        let segment_id = id_generator.next();
40        let segment_path = folder.join(segment_id.to_string());
41
42        Ok(Self {
43            id_generator,
44            folder: folder.into(),
45            target_size,
46
47            writers: vec![Writer::new(segment_path, segment_id)?],
48
49            compression: None,
50        })
51    }
52
53    /// Sets the compression method
54    #[must_use]
55    #[doc(hidden)]
56    pub fn use_compression(mut self, compressor: Option<C>) -> Self {
57        self.compression.clone_from(&compressor);
58        self.get_active_writer_mut().compression = compressor;
59        self
60    }
61
62    #[doc(hidden)]
63    #[must_use]
64    pub fn get_active_writer(&self) -> &Writer<C> {
65        // NOTE: initialized in constructor
66        #[allow(clippy::expect_used)]
67        self.writers.last().expect("should exist")
68    }
69
70    fn get_active_writer_mut(&mut self) -> &mut Writer<C> {
71        // NOTE: initialized in constructor
72        #[allow(clippy::expect_used)]
73        self.writers.last_mut().expect("should exist")
74    }
75
76    /// Returns the [`ValueHandle`] for the next written blob.
77    ///
78    /// This can be used to index an item into an external `Index`.
79    #[must_use]
80    pub fn get_next_value_handle(&self) -> ValueHandle {
81        ValueHandle {
82            offset: self.offset(),
83            segment_id: self.segment_id(),
84        }
85    }
86
87    #[doc(hidden)]
88    #[must_use]
89    pub fn offset(&self) -> u64 {
90        self.get_active_writer().offset()
91    }
92
93    #[must_use]
94    fn segment_id(&self) -> SegmentId {
95        self.get_active_writer().segment_id()
96    }
97
98    /// Sets up a new writer for the next segment
99    fn rotate(&mut self) -> crate::Result<()> {
100        log::debug!("Rotating segment writer");
101
102        let new_segment_id = self.id_generator.next();
103        let segment_path = self.folder.join(new_segment_id.to_string());
104
105        let new_writer =
106            Writer::new(segment_path, new_segment_id)?.use_compression(self.compression.clone());
107
108        self.writers.push(new_writer);
109
110        Ok(())
111    }
112
113    /// Writes an item.
114    ///
115    /// # Errors
116    ///
117    /// Will return `Err` if an IO error occurs.
118    pub fn write<K: AsRef<[u8]>, V: AsRef<[u8]>>(
119        &mut self,
120        key: K,
121        value: V,
122    ) -> crate::Result<u32> {
123        let key = key.as_ref();
124        let value = value.as_ref();
125
126        let target_size = self.target_size;
127
128        // Write actual value into segment
129        let writer = self.get_active_writer_mut();
130        let bytes_written = writer.write(key, value)?;
131
132        // Check for segment size target, maybe rotate to next writer
133        if writer.offset() >= target_size {
134            writer.flush()?;
135            self.rotate()?;
136        }
137
138        Ok(bytes_written)
139    }
140
141    pub(crate) fn finish(mut self) -> crate::Result<Vec<Writer<C>>> {
142        let writer = self.get_active_writer_mut();
143
144        if writer.item_count > 0 {
145            writer.flush()?;
146        }
147
148        // IMPORTANT: We cannot finish the index writer here
149        // The writers first need to be registered into the value log
150
151        Ok(self.writers)
152    }
153}