1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
use cannyls;
use cannyls::device::DeviceHandle;
use fibers::sync::mpsc;
use futures::{Async, Future, Stream};
use raftlog::election::Ballot;
use raftlog::log::{LogIndex, LogPosition, LogPrefix, LogSuffix};
use raftlog::{Error, ErrorKind, Result};
use slog::Logger;
use std::sync::atomic::{self, AtomicUsize};
use trackable::error::ErrorKindExt;

use LocalNodeId;

pub use self::ballot::{LoadBallot, SaveBallot};
pub use self::log::{LoadLog, SaveLog};
pub use self::log_prefix::{LoadLogPrefix, SaveLogPrefix};
pub use self::log_suffix::{LoadLogSuffix, SaveLogSuffix};

mod ballot;
mod log;
mod log_prefix;
mod log_suffix;

// ストレージの初期化処理を直接かするためのグローバル変数.
//
// 初期化時には、スナップ処理や大きなAppendEntriesの処理が入り重いので、
// 並列度を下げるために、これを利用する.
//
// 最終的にはもう少し上手い仕組みを考えたい.
// (個々のRaftノードに独立した仕組みにできるのとベスト)
static INITIALIZATION_LOCK: AtomicUsize = atomic::ATOMIC_USIZE_INIT;

fn acquire_initialization_lock() -> bool {
    INITIALIZATION_LOCK.compare_and_swap(0, 1, atomic::Ordering::SeqCst) == 0
}

fn release_initialization_lock() {
    INITIALIZATION_LOCK.fetch_sub(1, atomic::Ordering::SeqCst);
}

/// Raft用の永続ストレージ実装.
#[derive(Debug)]
pub struct Storage {
    handle: Handle,

    // スナップショット以降のログ領域を保持するバッファ.
    //
    // これは、読み込み速度向上用に用意されているものであり、
    // Raftノードのロード時を除き、末尾部分のログエントリの読み込みは、
    // 常にこのバッファ上から行われることになる.
    //
    // 反対に書き込みに関しては、常に即座に永続ストレージに即座に
    // エントリが保存される.
    // (同時にバッファにも追記が行われるが、エントリがバッファにしか存在しない期間、
    // というものは発生しない)
    log_suffix: LogSuffix,

    event_rx: mpsc::Receiver<Event>,
    event_tx: mpsc::Sender<Event>,
    phase: Phase,
}
impl Storage {
    /// 新しい`Storage`インスタンスを生成する.
    pub fn new(logger: Logger, node_id: LocalNodeId, device: DeviceHandle) -> Self {
        let (event_tx, event_rx) = mpsc::channel();
        Storage {
            handle: Handle {
                logger,
                node_id,
                device,
            },
            log_suffix: LogSuffix::default(),
            event_rx,
            event_tx,
            phase: Phase::Started,
        }
    }

    pub(crate) fn logger(&self) -> Logger {
        self.handle.logger.clone()
    }
    pub(crate) fn node_id(&self) -> LocalNodeId {
        self.handle.node_id
    }
    #[cfg(test)]
    pub(crate) fn handle(&self) -> Handle {
        self.handle.clone()
    }
    pub(crate) fn save_ballot(&mut self, ballot: Ballot) -> ballot::SaveBallot {
        ballot::SaveBallot::new(self, ballot)
    }
    pub(crate) fn load_ballot(&mut self) -> ballot::LoadBallot {
        ballot::LoadBallot::new(self)
    }
    pub(crate) fn load_log(&mut self, start: LogIndex, end: Option<LogIndex>) -> LoadLog {
        if let Err(e) = track!(self.poll_and_handle_event()) {
            return LoadLog(log::LoadLogInner::Failed(e));
        }

        // XXX: 全体的に`raftlog`の実装内容に依存しており、あまり良くはない
        let future = if let Some(end) = end {
            // 明示的に終端が指定されている == 初回ロード(ノード起動)時以降のログ読み込み
            if start < self.log_suffix.head.index {
                // バッファ地点以前のエントリが必要 => 存在しないのでスナップショットを返す
                let future = log_prefix::LoadLogPrefix::new(self);
                log::LoadLogInner::LoadLogPrefix {
                    next: None,
                    event_tx: None,
                    future,
                }
            } else {
                // バッファ内から取得
                let copy_from_buffer = || {
                    track_assert!(
                        start <= self.log_suffix.tail().index,
                        ErrorKind::InvalidInput
                    );
                    track_assert!(end <= self.log_suffix.tail().index, ErrorKind::InvalidInput);
                    track!(self.log_suffix.slice(start, end))
                };
                match copy_from_buffer() {
                    Err(e) => log::LoadLogInner::Failed(e),
                    Ok(suffix) => log::LoadLogInner::CopyLogSuffix(suffix),
                }
            }
        } else if start.as_u64() == 0 {
            // 「終端が未指定」かつ「開始地点が0」は、ノード起動時の最初のログ読み込みを示している
            // => まずスナップショットのロードを試みる
            let future = log_prefix::LoadLogPrefix::new(self);
            let next = Some(log_suffix::LoadLogSuffix::new(self));
            log::LoadLogInner::LoadLogPrefix {
                next,
                event_tx: Some(self.event_tx.clone()),
                future,
            }
        } else {
            // 「終端が未指定」かつ「開始地点が0以外」は、
            // ノード起動時かつスナップショットロード以降のログ読み込みを示している
            // => スナップショット以降のログエントリ群を取得する
            assert_eq!(start, self.log_suffix.head.index);
            log::LoadLogInner::LoadLogSuffix(log_suffix::LoadLogSuffix::new(self))
        };
        LoadLog(future)
    }
    pub(crate) fn save_log_suffix(&mut self, suffix: &LogSuffix) -> SaveLog {
        if self.phase != Phase::Initialized {
            // ログ書き込みが発生する、ということは初期化フェーズは抜けたことを意味する
            info!(self.handle.logger, "Initialized");
            if self.phase == Phase::Initializing {
                release_initialization_lock();
                info!(self.handle.logger, "Initialization lock is released");
            }
            self.phase = Phase::Initialized;
        }

        if let Err(e) = track!(self.poll_and_handle_event()) {
            return SaveLog(log::SaveLogInner::Failed(e));
        }

        // ローカルバッファに追記後に、永続化ストレージに保存する.
        //
        // `raftlog`から、このメソッドが返した`Future`が完了して初めて、
        // エントリ群の追記が完了したものとして認識されるので、
        // 先にバッファに追加してしまっても問題は発生しない.
        let future = if let Err(e) = track!(self.append_to_local_buffer(suffix)) {
            log_suffix::SaveLogSuffix::failed(self, e)
        } else {
            log_suffix::SaveLogSuffix::new(self, suffix)
        };
        SaveLog(log::SaveLogInner::Suffix(future))
    }
    pub(crate) fn save_log_prefix(&mut self, prefix: LogPrefix) -> SaveLog {
        if self.phase != Phase::Initialized {
            // ログ書き込みが発生する、ということは初期化フェーズは抜けたことを意味する
            info!(self.handle.logger, "Initialized");
            if self.phase == Phase::Initializing {
                release_initialization_lock();
                info!(self.handle.logger, "Initialization lock is released");
            }
            self.phase = Phase::Initialized;
        }

        let inner = if let Err(e) = track!(self.poll_and_handle_event()) {
            log::SaveLogInner::Failed(e)
        } else {
            log::SaveLogInner::Prefix(log_prefix::SaveLogPrefix::new(self, prefix))
        };
        SaveLog(inner)
    }

    #[allow(clippy::wrong_self_convention)]
    pub(crate) fn is_busy(&mut self) -> bool {
        if self.phase == Phase::Started {
            if acquire_initialization_lock() {
                info!(self.handle.logger, "Initialization lock is acquired");
                self.phase = Phase::Initializing;
                false
            } else {
                true
            }
        } else {
            false
        }
    }

    fn poll_and_handle_event(&mut self) -> Result<()> {
        while let Async::Ready(event) = self.event_rx.poll().expect("Never fails") {
            let event = event.expect("Never fails");
            match event {
                Event::LogPrefixUpdated { new_head } => {
                    track!(self.handle_log_prefix_updated_event(new_head))?;
                }
                Event::LogSuffixLoaded(suffix) => {
                    track!(self.handle_log_suffix_loaded_event(suffix))?;
                }
            }
        }
        Ok(())
    }
    fn handle_log_prefix_updated_event(&mut self, new_head: LogPosition) -> Result<()> {
        // ログの前半部分が更新されたので、それに合わせてバッファを調整する
        info!(
            self.handle.logger,
            "Event::LogPrefixUpdated: {}",
            dump!(self.log_suffix.head, new_head)
        );
        if self.log_suffix.head.index < new_head.index {
            if self.log_suffix.skip_to(new_head.index).is_err() {
                // バッファがカバーする範囲(i.e., ローカルログの範囲)よりも
                // 先の地点のスナップショットがインストールされた
                // => バッファを空にし、先頭地点を設定し直す
                self.log_suffix.head = new_head;
                self.log_suffix.entries.clear();
            }
            track_assert_eq!(
                new_head.index,
                self.log_suffix.head.index,
                ErrorKind::InconsistentState
            );
            if new_head.prev_term != self.log_suffix.head.prev_term {
                self.log_suffix.head.prev_term = new_head.prev_term;
                self.log_suffix.entries.clear();
            }
        }
        Ok(())
    }
    fn handle_log_suffix_loaded_event(&mut self, suffix: LogSuffix) -> Result<()> {
        // ログの接尾部分がストレージから読み込まれたので、バッファに反映する
        info!(
            self.handle.logger,
            "Event::LogSuffixLoaded: {}",
            dump!(suffix.head, suffix.entries.len())
        );
        self.log_suffix = suffix;
        Ok(())
    }
    fn append_to_local_buffer(&mut self, suffix: &LogSuffix) -> Result<()> {
        // ローカルログと`suffix`の領域に重複部分があるかをチェック
        // (未コミット分がロールバックされる可能性もあるので、
        // 必ずしも`suffix`の先端が、ローカルログの末端と一致する必要はない)
        let entries_offset = if self.log_suffix.head.index <= suffix.head.index {
            0
        } else {
            // スナップショットのインストールタイミング次第で、こちらに入ることがある
            self.log_suffix.head.index - suffix.head.index
        };
        track_assert!(
            suffix.head.index <= self.log_suffix.tail().index,
            ErrorKind::InconsistentState,
            "suffix.start={:?}, self.end={:?}",
            suffix.head.index,
            self.log_suffix.tail().index
        );

        // 整合性(prev_termの一致)チェック
        let offset = suffix.head.index + entries_offset - self.log_suffix.head.index;
        let prev_term = if offset == 0 {
            self.log_suffix.head.prev_term
        } else {
            self.log_suffix.entries[offset - 1].term()
        };
        track_assert_eq!(
            suffix.positions().nth(entries_offset).map(|p| p.prev_term),
            Some(prev_term),
            ErrorKind::InconsistentState,
            "suffix.start={:?}, self.start={:?}",
            suffix.positions().nth(entries_offset),
            self.log_suffix.head
        );

        // 末尾の余剰領域を削除(ロールバック)した上で、追記する
        self.log_suffix.entries.truncate(offset);
        self.log_suffix
            .entries
            .extend(suffix.entries.iter().skip(entries_offset).cloned());
        Ok(())
    }
}

#[derive(Debug, Clone)]
pub(crate) struct Handle {
    pub logger: Logger,
    pub node_id: LocalNodeId,
    pub device: DeviceHandle,
}

#[derive(Debug)]
pub(crate) enum Event {
    LogPrefixUpdated { new_head: LogPosition },
    LogSuffixLoaded(LogSuffix),
}

type BoxFuture<T> = Box<Future<Item = T, Error = Error> + Send + 'static>;

fn into_box_future<F>(future: F) -> BoxFuture<F::Item>
where
    F: Future<Error = cannyls::Error> + Send + 'static,
{
    let future = future.map_err(|e| match *e.kind() {
        cannyls::ErrorKind::DeviceBusy => ErrorKind::Busy.takes_over(e).into(),
        cannyls::ErrorKind::InvalidInput => ErrorKind::InvalidInput.takes_over(e).into(),
        cannyls::ErrorKind::Other
        | cannyls::ErrorKind::InconsistentState
        | cannyls::ErrorKind::DeviceTerminated
        | cannyls::ErrorKind::StorageCorrupted
        | cannyls::ErrorKind::StorageFull => ErrorKind::Other.takes_over(e).into(),
    });
    Box::new(future)
}

#[derive(Debug, PartialEq, Eq)]
enum Phase {
    Started,
    Initializing,
    Initialized,
}