selium_log/lib.rs
1//! A library containing an implementation of an ordered, log-based message queue.
2//!
3//! Selium Log aims to provide as simple an abstraction as possible over the message log,
4//! in order to make it easy to provision and use in your libraries.
5//!
6//! The message log's structure should be familiar to those coming from an Apache Kafka background: a log
7//! is represented by one or more segments, with each segment comprised of a memory-mapped index file, serving as
8//! a lookup for the segment's records, and an append-only data file containing said records.
9//!
10//! Only the most current segment can be designated as the mutable ("hot") segment, while the older segments
11//! are read-only until their eventual cleanup. Once the current hot segment exceeds the defined
12//! [LogConfig::max_index_entries](crate::config::LogConfig::max_index_entries) threshold, it will also become
13//! read-only, while a new segment is created and assigned as the hot segment.
14//!
15//! Replication has not yet been implemented for Selium Log as of this release, but is a planned feature.
16//! Due to this, durability can be tough to achieve alongside high throughput on a single node. Most of the
17//! latency comes from the I/O overhead of flushing the memory-mapped index and data files to the filesystem.
18//! To compensate for this overhead, the flushing frequency can be tweaked via the
19//! [FlushPolicy](crate::config::FlushPolicy) struct, to strike a balance between durability and throughput.
20
21mod tasks;
22
23pub mod config;
24pub mod data;
25pub mod error;
26pub mod index;
27pub mod message;
28pub mod segment;
29
30use crate::{
31 config::SharedLogConfig,
32 error::{LogError, Result},
33 message::{Message, MessageSlice},
34 segment::SegmentList,
35 tasks::{CleanerTask, FlusherTask},
36};
37use segment::SharedSegmentList;
38use std::{ffi::OsStr, path::Path, sync::Arc};
39use tokio::{
40 fs,
41 sync::{mpsc, RwLock},
42};
43
44/// The entry point to Selium Log.
45///
46/// The MessageLog struct manages creating and opening logs, along with provisioning the
47/// [SegmentList], and coordinates atomic reads and writes.
48///
49/// The MessageLog also creates the Flusher and Cleaner asynchronous tasks, and takes ownership of them to
50/// assure that the tasks are gracefully terminated when the MessageLog instance is destroyed.
51///
52/// # Examples
53/// ```
54/// use anyhow::Result;
55/// use selium_log::{config::LogConfig, message::Message, MessageLog};
56/// use std::sync::Arc;
57///
58/// const MESSAGE_VERSION: u32 = 1;
59///
60/// #[tokio::main]
61/// async fn main() -> Result<()> {
62/// let config = LogConfig::from_path("path/to/segments/dir");
63/// let log = MessageLog::open(Arc::new(config)).await?;
64/// let message = Message::single(b"Hello, world!", MESSAGE_VERSION);
65///
66/// log.write(message).await?;
67/// log.flush().await?;
68/// let slice = log.read_slice(0, None).await?;
69///
70/// if let Some(mut iter) = slice.messages() {
71/// let next = iter.next().await?;
72/// println!("{next:?}")
73/// }
74
75/// Ok(())
76/// }
77/// ```
78#[derive(Debug)]
79pub struct MessageLog {
80 segments: SharedSegmentList,
81 config: SharedLogConfig,
82 flush_interrupt: mpsc::Sender<()>,
83 _flusher: Arc<FlusherTask>,
84 _cleaner: Arc<CleanerTask>,
85}
86
87impl MessageLog {
88 /// Opens a message log at the segments directory configured in the provided `config`
89 /// argument.
90 ///
91 /// If the log directory does not yet exist, it will be created.
92 ///
93 /// Existing segments will be loaded from the filesystem, and provisioned as a
94 /// [SegmentList] instance. If no segments exist in the log directory,
95 /// a single "hot" segment will be created.
96 ///
97 /// The Flusher and Cleaner asynchronous tasks will also be started, and will run in the background.
98 ///
99 /// # Errors
100 /// - Returns [LogError::CreateLogsDirectory] if an error occurs while creating the log directory.
101 /// - Returns Err if an error occurs while constructing the [SegmentList].
102 pub async fn open(config: SharedLogConfig) -> Result<Self> {
103 fs::create_dir_all(&config.segments_path)
104 .await
105 .map_err(LogError::CreateLogsDirectory)?;
106
107 let segments = load_segments(config.clone()).await?;
108 let (_flusher, flush_interrupt) = FlusherTask::start(config.clone(), segments.clone());
109 let _cleaner = CleanerTask::start(config.clone(), segments.clone());
110
111 Ok(Self {
112 segments,
113 config,
114 flush_interrupt,
115 _flusher,
116 _cleaner,
117 })
118 }
119
120 /// Writes the provided [Message] to the current hot segment.
121 ///
122 /// If the hot segment is at full capacity following the write, the current hot segment
123 /// will be flushed, and a new segment will be created and designated as the hot segment in
124 /// its place. Otherwise, the `writes_since_last_flush` field is incremented by 1.
125 ///
126 /// # Errors
127 /// - Returns [LogError::SegmentListEmpty] if there are no segments in the list yet.
128 /// - Returns Err if writing to the hot segment fails.
129 /// - Returns Err if the segment is full, and the current hot segment fails to flush.
130 /// - Returns Err if the segment is full, and the new hot segment fails to be created.
131 pub async fn write(&self, message: Message) -> Result<()> {
132 self.segments.write().await.write(message).await?;
133 self.try_flush().await?;
134 Ok(())
135 }
136
137 /// Reads a range of messages from a segment identified by the provided offset.
138 ///
139 /// Returns an empty [MessageSlice] if the provided offset is greater than the total
140 /// amount of entries in the log.
141 ///
142 /// # Params
143 /// * `offset` - The starting offset, used to locate the segment and relative offset.
144 /// * `limit` - An optional message read limit.
145 pub async fn read_slice(&self, offset: u64, limit: Option<u64>) -> Result<MessageSlice> {
146 self.segments.read().await.read_slice(offset, limit).await
147 }
148
149 /// Flushes the hot segment to the filesystem.
150 /// The Flusher task interval will also be interrupted and reset.
151 ///
152 /// # Errors
153 /// - Returns Err if the hot segment fails to flush.
154 pub async fn flush(&self) -> Result<()> {
155 self.segments.write().await.flush().await?;
156 let _ = self.flush_interrupt.send(()).await;
157 Ok(())
158 }
159
160 /// Retrieves the total number of entries in the log, based on the `end_offset` in the current
161 /// hot segment.
162 pub async fn number_of_entries(&self) -> u64 {
163 self.segments.read().await.number_of_entries()
164 }
165
166 async fn try_flush(&self) -> Result<()> {
167 let segments = self.segments.read().await;
168
169 let should_flush = match self.config.flush_policy.number_of_writes {
170 Some(number_of_writes) => segments.writes_since_last_flush() >= number_of_writes,
171 None => false,
172 };
173
174 drop(segments);
175
176 if should_flush {
177 self.flush().await?;
178 }
179
180 Ok(())
181 }
182}
183
184fn is_index_file(path: &Path) -> bool {
185 path.is_file() && path.extension() == Some("index".as_ref())
186}
187
188async fn get_offsets(path: impl AsRef<Path>) -> Result<Vec<u64>> {
189 let mut offsets = vec![];
190 let mut entries = fs::read_dir(&path).await.map_err(LogError::LoadSegments)?;
191
192 while let Some(entry) = entries.next_entry().await? {
193 let path = entry.path();
194
195 if is_index_file(&path) {
196 if let Some(offset) = path
197 .file_name()
198 .and_then(OsStr::to_str)
199 .map(|s| s.trim_end_matches(".index"))
200 .and_then(|s| s.parse().ok())
201 {
202 offsets.push(offset);
203 }
204 }
205 }
206
207 Ok(offsets)
208}
209
210async fn load_segments(config: SharedLogConfig) -> Result<SharedSegmentList> {
211 let path = &config.segments_path;
212 let offsets = get_offsets(path).await?;
213
214 let segments = if !offsets.is_empty() {
215 SegmentList::from_offsets(&offsets, config.clone()).await?
216 } else {
217 SegmentList::create(config.clone()).await?
218 };
219
220 let shared_segments = Arc::new(RwLock::new(segments));
221 Ok(shared_segments)
222}