raindb/
writers.rs

1// Copyright (c) 2021 Google LLC
2//
3// Use of this source code is governed by an MIT-style
4// license that can be found in the LICENSE file or at
5// https://opensource.org/licenses/MIT.
6
7/*!
8This module provides abstractions that represent threads performing write operations, specifically
9the [`Writer`] struct.
10*/
11
12use std::ptr;
13
14use parking_lot::{Condvar, Mutex, MutexGuard};
15
16use crate::db::GuardedDbFields;
17use crate::errors::RainDBResult;
18use crate::Batch;
19
20/**
21Mutable fields within a [`Writer`].
22
23These will be wrapped by a mutex to provide interior mutability without need to keep a lock
24around the entire parent [`Writer`] object.
25*/
26struct WriterInner {
27    /// Whether the requested operation was completed regardless of whether if failed or succeeded.
28    pub operation_completed: bool,
29
30    /**
31    The result of the operation.
32
33    This field must be populated if `operation_completed` was set to `true`. This field is mostly
34    used to indicate status to a writer if its operation was part of a group commit.
35    */
36    pub operation_result: Option<RainDBResult<()>>,
37}
38
39/**
40A thread requesting a write operation.
41
42When multiple threads request a write operation, RainDB will queue up the threads so that the
43writes occur serially. Threads waiting in the queue are parked and then signalled to wake up when it
44is their turn to perform the requested operation.
45*/
46pub(crate) struct Writer {
47    /**
48    The batch of operations this writer is requesting to be performed.
49
50    This is `None` if a client is forcing a compaction check.
51    */
52    maybe_batch: Option<Batch>,
53
54    /// Whether the write operations should be synchronously flushed to disk.
55    synchronous_write: bool,
56
57    /**
58    Fields in a writer that need to be mutable.
59
60    The mutex is for interior mutability where only an immutable reference is needed to make
61    changes and a lock doesn't have to be placed around the entire writer.
62    */
63    inner: Mutex<WriterInner>,
64
65    /**
66    A condition variable to signal the thread to park or wake-up to perform it's requested
67    operation.
68    */
69    thread_signaller: Condvar,
70}
71
72impl PartialEq for Writer {
73    fn eq(&self, other: &Self) -> bool {
74        self.maybe_batch == other.maybe_batch
75            && self.synchronous_write == other.synchronous_write
76            && ptr::eq(&self.inner, &other.inner)
77    }
78}
79
80/// Public methods
81impl Writer {
82    /// Create a new instance of [`Writer`].
83    pub fn new(maybe_batch: Option<Batch>, synchronous_write: bool) -> Self {
84        let inner = WriterInner {
85            operation_completed: false,
86            operation_result: None,
87        };
88
89        Self {
90            maybe_batch,
91            synchronous_write,
92            inner: Mutex::new(inner),
93            thread_signaller: Condvar::new(),
94        }
95    }
96
97    /// Whether the writer should perform synchronous writes.
98    pub fn is_synchronous_write(&self) -> bool {
99        self.synchronous_write
100    }
101
102    /// Get a reference to the operations this writer needs to perform.
103    pub fn maybe_batch(&self) -> Option<&Batch> {
104        self.maybe_batch.as_ref()
105    }
106
107    /// Parks the thread while it waits for its turn to perform its operation.
108    pub fn wait_for_turn(&self, database_mutex_guard: &mut MutexGuard<GuardedDbFields>) {
109        self.thread_signaller.wait(database_mutex_guard)
110    }
111
112    /**
113    Notify that writer that it is potentially it's turn to perform its operation.
114
115    If the operation was already completed as part of a group commit, the thread will return.
116    */
117    pub fn notify_writer(&self) -> bool {
118        self.thread_signaller.notify_one()
119    }
120
121    /**
122    Return true if the operation is complete. Otherwise, false.
123
124    This will attempt to get a lock on the inner fields.
125    */
126    pub fn is_operation_complete(&self) -> bool {
127        self.inner.lock().operation_completed
128    }
129
130    /**
131    Set whether or not the operation is complete.
132
133    This will attempt to get a lock on the inner fields.
134    */
135    pub fn set_operation_completed(&self, is_complete: bool) -> bool {
136        let mut mutex_guard = self.inner.lock();
137        mutex_guard.operation_completed = is_complete;
138
139        mutex_guard.operation_completed
140    }
141
142    /**
143    Get a copy of the result of the write operation.
144
145    This will attempt to get a lock on the inner fields.
146    */
147    pub fn get_operation_result(&self) -> Option<RainDBResult<()>> {
148        self.inner.lock().operation_result.clone()
149    }
150
151    /**
152    Set the result of the write operation.
153
154    This will attempt to get a lock on the inner fields.
155    */
156    pub fn set_operation_result(&self, operation_result: RainDBResult<()>) {
157        let mut mutex_guard = self.inner.lock();
158        mutex_guard.operation_result = Some(operation_result);
159    }
160}