value_log/segment/
multi_writer.rs1use super::writer::Writer;
6use crate::{
7 compression::Compressor,
8 id::{IdGenerator, SegmentId},
9 ValueHandle,
10};
11use std::path::{Path, PathBuf};
12
13pub struct MultiWriter<C: Compressor + Clone> {
15 folder: PathBuf,
16 target_size: u64,
17
18 writers: Vec<Writer<C>>,
19
20 id_generator: IdGenerator,
21
22 compression: Option<C>,
23}
24
25impl<C: Compressor + Clone> MultiWriter<C> {
26 #[doc(hidden)]
32 pub fn new<P: AsRef<Path>>(
33 id_generator: IdGenerator,
34 target_size: u64,
35 folder: P,
36 ) -> std::io::Result<Self> {
37 let folder = folder.as_ref();
38
39 let segment_id = id_generator.next();
40 let segment_path = folder.join(segment_id.to_string());
41
42 Ok(Self {
43 id_generator,
44 folder: folder.into(),
45 target_size,
46
47 writers: vec![Writer::new(segment_path, segment_id)?],
48
49 compression: None,
50 })
51 }
52
53 #[must_use]
55 #[doc(hidden)]
56 pub fn use_compression(mut self, compressor: Option<C>) -> Self {
57 self.compression.clone_from(&compressor);
58 self.get_active_writer_mut().compression = compressor;
59 self
60 }
61
62 #[doc(hidden)]
63 #[must_use]
64 pub fn get_active_writer(&self) -> &Writer<C> {
65 #[allow(clippy::expect_used)]
67 self.writers.last().expect("should exist")
68 }
69
70 fn get_active_writer_mut(&mut self) -> &mut Writer<C> {
71 #[allow(clippy::expect_used)]
73 self.writers.last_mut().expect("should exist")
74 }
75
76 #[must_use]
80 pub fn get_next_value_handle(&self) -> ValueHandle {
81 ValueHandle {
82 offset: self.offset(),
83 segment_id: self.segment_id(),
84 }
85 }
86
87 #[doc(hidden)]
88 #[must_use]
89 pub fn offset(&self) -> u64 {
90 self.get_active_writer().offset()
91 }
92
93 #[must_use]
94 fn segment_id(&self) -> SegmentId {
95 self.get_active_writer().segment_id()
96 }
97
98 fn rotate(&mut self) -> crate::Result<()> {
100 log::debug!("Rotating segment writer");
101
102 let new_segment_id = self.id_generator.next();
103 let segment_path = self.folder.join(new_segment_id.to_string());
104
105 let new_writer =
106 Writer::new(segment_path, new_segment_id)?.use_compression(self.compression.clone());
107
108 self.writers.push(new_writer);
109
110 Ok(())
111 }
112
113 pub fn write<K: AsRef<[u8]>, V: AsRef<[u8]>>(
119 &mut self,
120 key: K,
121 value: V,
122 ) -> crate::Result<u32> {
123 let key = key.as_ref();
124 let value = value.as_ref();
125
126 let target_size = self.target_size;
127
128 let writer = self.get_active_writer_mut();
130 let bytes_written = writer.write(key, value)?;
131
132 if writer.offset() >= target_size {
134 writer.flush()?;
135 self.rotate()?;
136 }
137
138 Ok(bytes_written)
139 }
140
141 pub(crate) fn finish(mut self) -> crate::Result<Vec<Writer<C>>> {
142 let writer = self.get_active_writer_mut();
143
144 if writer.item_count > 0 {
145 writer.flush()?;
146 }
147
148 Ok(self.writers)
152 }
153}