value_log/segment/
multi_writer.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
// Copyright (c) 2024-present, fjall-rs
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)

use super::writer::Writer;
use crate::{
    compression::Compressor,
    id::{IdGenerator, SegmentId},
    ValueHandle,
};
use std::path::{Path, PathBuf};

/// Segment writer, may write multiple segments
pub struct MultiWriter<C: Compressor + Clone> {
    folder: PathBuf,
    target_size: u64,

    writers: Vec<Writer<C>>,

    id_generator: IdGenerator,

    compression: Option<C>,
}

impl<C: Compressor + Clone> MultiWriter<C> {
    /// Initializes a new segment writer.
    ///
    /// # Errors
    ///
    /// Will return `Err` if an IO error occurs.
    #[doc(hidden)]
    pub fn new<P: AsRef<Path>>(
        id_generator: IdGenerator,
        target_size: u64,
        folder: P,
    ) -> std::io::Result<Self> {
        let folder = folder.as_ref();

        let segment_id = id_generator.next();
        let segment_path = folder.join(segment_id.to_string());

        Ok(Self {
            id_generator,
            folder: folder.into(),
            target_size,

            writers: vec![Writer::new(segment_path, segment_id)?],

            compression: None,
        })
    }

    /// Sets the compression method
    #[must_use]
    #[doc(hidden)]
    pub fn use_compression(mut self, compressor: C) -> Self {
        self.compression = Some(compressor.clone());
        self.get_active_writer_mut().compression = Some(compressor);
        self
    }

    #[doc(hidden)]
    #[must_use]
    pub fn get_active_writer(&self) -> &Writer<C> {
        // NOTE: initialized in constructor
        #[allow(clippy::expect_used)]
        self.writers.last().expect("should exist")
    }

    fn get_active_writer_mut(&mut self) -> &mut Writer<C> {
        // NOTE: initialized in constructor
        #[allow(clippy::expect_used)]
        self.writers.last_mut().expect("should exist")
    }

    /// Returns the [`ValueHandle`] for the next written blob.
    ///
    /// This can be used to index an item into an external `Index`.
    #[must_use]
    pub fn get_next_value_handle(&self) -> ValueHandle {
        ValueHandle {
            offset: self.offset(),
            segment_id: self.segment_id(),
        }
    }

    #[doc(hidden)]
    #[must_use]
    pub fn offset(&self) -> u64 {
        self.get_active_writer().offset()
    }

    #[must_use]
    fn segment_id(&self) -> SegmentId {
        self.get_active_writer().segment_id()
    }

    /// Sets up a new writer for the next segment
    fn rotate(&mut self) -> crate::Result<()> {
        log::debug!("Rotating segment writer");

        let new_segment_id = self.id_generator.next();
        let segment_path = self.folder.join(new_segment_id.to_string());

        let new_writer =
            Writer::new(segment_path, new_segment_id)?.use_compression(self.compression.clone());

        self.writers.push(new_writer);

        Ok(())
    }

    /// Writes an item.
    ///
    /// # Errors
    ///
    /// Will return `Err` if an IO error occurs.
    pub fn write<K: AsRef<[u8]>, V: AsRef<[u8]>>(
        &mut self,
        key: K,
        value: V,
    ) -> crate::Result<u32> {
        let key = key.as_ref();
        let value = value.as_ref();

        let target_size = self.target_size;

        // Write actual value into segment
        let writer = self.get_active_writer_mut();
        let bytes_written = writer.write(key, value)?;

        // Check for segment size target, maybe rotate to next writer
        if writer.offset() >= target_size {
            writer.flush()?;
            self.rotate()?;
        }

        Ok(bytes_written)
    }

    pub(crate) fn finish(mut self) -> crate::Result<Vec<Writer<C>>> {
        let writer = self.get_active_writer_mut();

        if writer.item_count > 0 {
            writer.flush()?;
        }

        // IMPORTANT: We cannot finish the index writer here
        // The writers first need to be registered into the value log

        Ok(self.writers)
    }
}