selium_log/
lib.rs

1//! A library containing an implementation of an ordered, log-based message queue.
2//!
3//! Selium Log aims to provide as simple an abstraction as possible over the message log,
4//! in order to make it easy to provision and use in your libraries.
5//!
6//! The message log's structure should be familiar to those coming from an Apache Kafka background: a log
7//! is represented by one or more segments, with each segment comprised of a memory-mapped index file, serving as
8//! a lookup for the segment's records, and an append-only data file containing said records.
9//!
10//! Only the most current segment can be designated as the mutable ("hot") segment, while the older segments
11//! are read-only until their eventual cleanup. Once the current hot segment exceeds the defined
12//! [LogConfig::max_index_entries](crate::config::LogConfig::max_index_entries) threshold, it will also become
13//! read-only, while a new segment is created and assigned as the hot segment.
14//!
15//! Replication has not yet been implemented for Selium Log as of this release, but is a planned feature.
16//! Due to this, durability can be tough to achieve alongside high throughput on a single node. Most of the
17//! latency comes from the I/O overhead of flushing the memory-mapped index and data files to the filesystem.
18//! To compensate for this overhead, the flushing frequency can be tweaked via the
19//! [FlushPolicy](crate::config::FlushPolicy) struct, to strike a balance between durability and throughput.
20
21mod tasks;
22
23pub mod config;
24pub mod data;
25pub mod error;
26pub mod index;
27pub mod message;
28pub mod segment;
29
30use crate::{
31    config::SharedLogConfig,
32    error::{LogError, Result},
33    message::{Message, MessageSlice},
34    segment::SegmentList,
35    tasks::{CleanerTask, FlusherTask},
36};
37use segment::SharedSegmentList;
38use std::{ffi::OsStr, path::Path, sync::Arc};
39use tokio::{
40    fs,
41    sync::{mpsc, RwLock},
42};
43
44/// The entry point to Selium Log.
45///
46/// The MessageLog struct manages creating and opening logs, along with provisioning the
47/// [SegmentList], and coordinates atomic reads and writes.
48///
49/// The MessageLog also creates the Flusher and Cleaner asynchronous tasks, and takes ownership of them to
50/// assure that the tasks are gracefully terminated when the MessageLog instance is destroyed.
51///
52/// # Examples
53/// ```
54/// use anyhow::Result;
55/// use selium_log::{config::LogConfig, message::Message, MessageLog};
56/// use std::sync::Arc;
57///
58/// const MESSAGE_VERSION: u32 = 1;
59///
60/// #[tokio::main]
61/// async fn main() -> Result<()> {
62///     let config = LogConfig::from_path("path/to/segments/dir");
63///     let log = MessageLog::open(Arc::new(config)).await?;
64///     let message = Message::single(b"Hello, world!", MESSAGE_VERSION);
65///
66///     log.write(message).await?;
67///     log.flush().await?;
68///     let slice = log.read_slice(0, None).await?;
69///
70///     if let Some(mut iter) = slice.messages() {
71///         let next = iter.next().await?;
72///         println!("{next:?}")
73///     }
74
75///     Ok(())
76/// }
77/// ```
78#[derive(Debug)]
79pub struct MessageLog {
80    segments: SharedSegmentList,
81    config: SharedLogConfig,
82    flush_interrupt: mpsc::Sender<()>,
83    _flusher: Arc<FlusherTask>,
84    _cleaner: Arc<CleanerTask>,
85}
86
87impl MessageLog {
88    /// Opens a message log at the segments directory configured in the provided `config`
89    /// argument.
90    ///
91    /// If the log directory does not yet exist, it will be created.
92    ///
93    /// Existing segments will be loaded from the filesystem, and provisioned as a
94    /// [SegmentList] instance. If no segments exist in the log directory,
95    /// a single "hot" segment will be created.
96    ///
97    /// The Flusher and Cleaner asynchronous tasks will also be started, and will run in the background.
98    ///
99    /// # Errors
100    /// - Returns [LogError::CreateLogsDirectory] if an error occurs while creating the log directory.
101    /// - Returns Err if an error occurs while constructing the [SegmentList].
102    pub async fn open(config: SharedLogConfig) -> Result<Self> {
103        fs::create_dir_all(&config.segments_path)
104            .await
105            .map_err(LogError::CreateLogsDirectory)?;
106
107        let segments = load_segments(config.clone()).await?;
108        let (_flusher, flush_interrupt) = FlusherTask::start(config.clone(), segments.clone());
109        let _cleaner = CleanerTask::start(config.clone(), segments.clone());
110
111        Ok(Self {
112            segments,
113            config,
114            flush_interrupt,
115            _flusher,
116            _cleaner,
117        })
118    }
119
120    /// Writes the provided [Message] to the current hot segment.
121    ///
122    /// If the hot segment is at full capacity following the write, the current hot segment
123    /// will be flushed, and a new segment will be created and designated as the hot segment in
124    /// its place. Otherwise, the `writes_since_last_flush` field is incremented by 1.
125    ///
126    /// # Errors
127    /// - Returns [LogError::SegmentListEmpty] if there are no segments in the list yet.
128    /// - Returns Err if writing to the hot segment fails.
129    /// - Returns Err if the segment is full, and the current hot segment fails to flush.
130    /// - Returns Err if the segment is full, and the new hot segment fails to be created.
131    pub async fn write(&self, message: Message) -> Result<()> {
132        self.segments.write().await.write(message).await?;
133        self.try_flush().await?;
134        Ok(())
135    }
136
137    /// Reads a range of messages from a segment identified by the provided offset.
138    ///
139    /// Returns an empty [MessageSlice] if the provided offset is greater than the total
140    /// amount of entries in the log.
141    ///
142    /// # Params
143    /// * `offset` - The starting offset, used to locate the segment and relative offset.
144    /// * `limit` - An optional message read limit.
145    pub async fn read_slice(&self, offset: u64, limit: Option<u64>) -> Result<MessageSlice> {
146        self.segments.read().await.read_slice(offset, limit).await
147    }
148
149    /// Flushes the hot segment to the filesystem.
150    /// The Flusher task interval will also be interrupted and reset.
151    ///
152    /// # Errors
153    /// - Returns Err if the hot segment fails to flush.
154    pub async fn flush(&self) -> Result<()> {
155        self.segments.write().await.flush().await?;
156        let _ = self.flush_interrupt.send(()).await;
157        Ok(())
158    }
159
160    /// Retrieves the total number of entries in the log, based on the `end_offset` in the current
161    /// hot segment.
162    pub async fn number_of_entries(&self) -> u64 {
163        self.segments.read().await.number_of_entries()
164    }
165
166    async fn try_flush(&self) -> Result<()> {
167        let segments = self.segments.read().await;
168
169        let should_flush = match self.config.flush_policy.number_of_writes {
170            Some(number_of_writes) => segments.writes_since_last_flush() >= number_of_writes,
171            None => false,
172        };
173
174        drop(segments);
175
176        if should_flush {
177            self.flush().await?;
178        }
179
180        Ok(())
181    }
182}
183
184fn is_index_file(path: &Path) -> bool {
185    path.is_file() && path.extension() == Some("index".as_ref())
186}
187
188async fn get_offsets(path: impl AsRef<Path>) -> Result<Vec<u64>> {
189    let mut offsets = vec![];
190    let mut entries = fs::read_dir(&path).await.map_err(LogError::LoadSegments)?;
191
192    while let Some(entry) = entries.next_entry().await? {
193        let path = entry.path();
194
195        if is_index_file(&path) {
196            if let Some(offset) = path
197                .file_name()
198                .and_then(OsStr::to_str)
199                .map(|s| s.trim_end_matches(".index"))
200                .and_then(|s| s.parse().ok())
201            {
202                offsets.push(offset);
203            }
204        }
205    }
206
207    Ok(offsets)
208}
209
210async fn load_segments(config: SharedLogConfig) -> Result<SharedSegmentList> {
211    let path = &config.segments_path;
212    let offsets = get_offsets(path).await?;
213
214    let segments = if !offsets.is_empty() {
215        SegmentList::from_offsets(&offsets, config.clone()).await?
216    } else {
217        SegmentList::create(config.clone()).await?
218    };
219
220    let shared_segments = Arc::new(RwLock::new(segments));
221    Ok(shared_segments)
222}