raindb/writers.rs
1// Copyright (c) 2021 Google LLC
2//
3// Use of this source code is governed by an MIT-style
4// license that can be found in the LICENSE file or at
5// https://opensource.org/licenses/MIT.
6
7/*!
8This module provides abstractions that represent threads performing write operations, specifically
9the [`Writer`] struct.
10*/
11
12use std::ptr;
13
14use parking_lot::{Condvar, Mutex, MutexGuard};
15
16use crate::db::GuardedDbFields;
17use crate::errors::RainDBResult;
18use crate::Batch;
19
20/**
21Mutable fields within a [`Writer`].
22
23These will be wrapped by a mutex to provide interior mutability without need to keep a lock
24around the entire parent [`Writer`] object.
25*/
26struct WriterInner {
27 /// Whether the requested operation was completed regardless of whether if failed or succeeded.
28 pub operation_completed: bool,
29
30 /**
31 The result of the operation.
32
33 This field must be populated if `operation_completed` was set to `true`. This field is mostly
34 used to indicate status to a writer if its operation was part of a group commit.
35 */
36 pub operation_result: Option<RainDBResult<()>>,
37}
38
39/**
40A thread requesting a write operation.
41
42When multiple threads request a write operation, RainDB will queue up the threads so that the
43writes occur serially. Threads waiting in the queue are parked and then signalled to wake up when it
44is their turn to perform the requested operation.
45*/
46pub(crate) struct Writer {
47 /**
48 The batch of operations this writer is requesting to be performed.
49
50 This is `None` if a client is forcing a compaction check.
51 */
52 maybe_batch: Option<Batch>,
53
54 /// Whether the write operations should be synchronously flushed to disk.
55 synchronous_write: bool,
56
57 /**
58 Fields in a writer that need to be mutable.
59
60 The mutex is for interior mutability where only an immutable reference is needed to make
61 changes and a lock doesn't have to be placed around the entire writer.
62 */
63 inner: Mutex<WriterInner>,
64
65 /**
66 A condition variable to signal the thread to park or wake-up to perform it's requested
67 operation.
68 */
69 thread_signaller: Condvar,
70}
71
72impl PartialEq for Writer {
73 fn eq(&self, other: &Self) -> bool {
74 self.maybe_batch == other.maybe_batch
75 && self.synchronous_write == other.synchronous_write
76 && ptr::eq(&self.inner, &other.inner)
77 }
78}
79
80/// Public methods
81impl Writer {
82 /// Create a new instance of [`Writer`].
83 pub fn new(maybe_batch: Option<Batch>, synchronous_write: bool) -> Self {
84 let inner = WriterInner {
85 operation_completed: false,
86 operation_result: None,
87 };
88
89 Self {
90 maybe_batch,
91 synchronous_write,
92 inner: Mutex::new(inner),
93 thread_signaller: Condvar::new(),
94 }
95 }
96
97 /// Whether the writer should perform synchronous writes.
98 pub fn is_synchronous_write(&self) -> bool {
99 self.synchronous_write
100 }
101
102 /// Get a reference to the operations this writer needs to perform.
103 pub fn maybe_batch(&self) -> Option<&Batch> {
104 self.maybe_batch.as_ref()
105 }
106
107 /// Parks the thread while it waits for its turn to perform its operation.
108 pub fn wait_for_turn(&self, database_mutex_guard: &mut MutexGuard<GuardedDbFields>) {
109 self.thread_signaller.wait(database_mutex_guard)
110 }
111
112 /**
113 Notify that writer that it is potentially it's turn to perform its operation.
114
115 If the operation was already completed as part of a group commit, the thread will return.
116 */
117 pub fn notify_writer(&self) -> bool {
118 self.thread_signaller.notify_one()
119 }
120
121 /**
122 Return true if the operation is complete. Otherwise, false.
123
124 This will attempt to get a lock on the inner fields.
125 */
126 pub fn is_operation_complete(&self) -> bool {
127 self.inner.lock().operation_completed
128 }
129
130 /**
131 Set whether or not the operation is complete.
132
133 This will attempt to get a lock on the inner fields.
134 */
135 pub fn set_operation_completed(&self, is_complete: bool) -> bool {
136 let mut mutex_guard = self.inner.lock();
137 mutex_guard.operation_completed = is_complete;
138
139 mutex_guard.operation_completed
140 }
141
142 /**
143 Get a copy of the result of the write operation.
144
145 This will attempt to get a lock on the inner fields.
146 */
147 pub fn get_operation_result(&self) -> Option<RainDBResult<()>> {
148 self.inner.lock().operation_result.clone()
149 }
150
151 /**
152 Set the result of the write operation.
153
154 This will attempt to get a lock on the inner fields.
155 */
156 pub fn set_operation_result(&self, operation_result: RainDBResult<()>) {
157 let mut mutex_guard = self.inner.lock();
158 mutex_guard.operation_result = Some(operation_result);
159 }
160}