1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
//! Async engine operations for `GrafeoDB`.
//!
//! These methods offload I/O-bound operations (WAL checkpoint, snapshot writes)
//! to blocking tasks via [`tokio::task::spawn_blocking`], avoiding blocking the
//! tokio runtime. Query execution stays sync (CPU-bound, rayon).
//!
//! All methods take `self: &Arc<Self>` so the `GrafeoDB` can be cheaply cloned
//! into the blocking closure. This matches how `grafeo-server` already holds
//! databases in `Arc<GrafeoDB>`.
use std::sync::Arc;
use grafeo_common::utils::error::{Error, Result};
use super::GrafeoDB;
impl GrafeoDB {
/// Asynchronous WAL checkpoint.
///
/// Offloads the sync [`wal_checkpoint()`](Self::wal_checkpoint) to a blocking
/// task so the tokio runtime is not blocked during fsync.
///
/// Takes `&Arc<Self>` because the blocking closure must be `'static`.
/// This matches the `Arc<GrafeoDB>` pattern used by `grafeo-server`.
///
/// # Errors
///
/// Returns an error if the checkpoint fails or the blocking task panics.
///
/// # Examples
///
/// ```no_run
/// # async fn example() -> grafeo_common::utils::error::Result<()> {
/// use std::sync::Arc;
/// use grafeo_engine::GrafeoDB;
///
/// let db = Arc::new(GrafeoDB::open("./my_db")?);
/// db.async_wal_checkpoint().await?;
/// # Ok(())
/// # }
/// ```
pub async fn async_wal_checkpoint(self: &Arc<Self>) -> Result<()> {
let db = Arc::clone(self);
tokio::task::spawn_blocking(move || db.wal_checkpoint())
.await
.map_err(|e| Error::Internal(format!("async checkpoint task failed: {e}")))?
}
/// Asynchronous snapshot write to the `.grafeo` file.
///
/// Offloads the full snapshot export and file write to a blocking task.
/// This is the most I/O-intensive operation (serialization + fsync) and
/// benefits most from not blocking the tokio runtime.
///
/// Requires the `grafeo-file` feature.
///
/// # Errors
///
/// Returns an error if the snapshot write fails or the blocking task panics.
///
/// # Examples
///
/// ```no_run
/// # async fn example() -> grafeo_common::utils::error::Result<()> {
/// use std::sync::Arc;
/// use grafeo_engine::GrafeoDB;
///
/// let db = Arc::new(GrafeoDB::open("./my_db")?);
/// db.async_write_snapshot().await?;
/// # Ok(())
/// # }
/// ```
#[cfg(feature = "grafeo-file")]
pub async fn async_write_snapshot(self: &Arc<Self>) -> Result<()> {
let db = Arc::clone(self);
tokio::task::spawn_blocking(move || {
let Some(ref fm) = db.file_manager else {
return Err(Error::Internal(
"no file manager configured for snapshot write".to_string(),
));
};
db.checkpoint_to_file(fm, super::flush::FlushReason::Checkpoint)
.map(|_| ())
})
.await
.map_err(|e| Error::Internal(format!("async snapshot task failed: {e}")))?
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn async_wal_checkpoint_in_memory() {
// In-memory databases have no WAL, so this should be a no-op success.
let db = Arc::new(GrafeoDB::new_in_memory());
db.async_wal_checkpoint().await.unwrap();
}
#[tokio::test]
async fn async_wal_checkpoint_with_data() {
let dir = tempfile::tempdir().unwrap();
let db = Arc::new(GrafeoDB::open(dir.path().join("test.grafeo")).unwrap());
// Insert some data
let session = db.session();
session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
session.execute("INSERT (:Person {name: 'Gus'})").unwrap();
drop(session);
// Async checkpoint should succeed
db.async_wal_checkpoint().await.unwrap();
}
#[tokio::test]
async fn async_wal_checkpoint_does_not_block_runtime() {
// Verify that async_wal_checkpoint yields to the runtime.
// Run a concurrent task alongside the checkpoint.
let db = Arc::new(GrafeoDB::new_in_memory());
let (checkpoint_result, concurrent_result) =
tokio::join!(db.async_wal_checkpoint(), async { 42 },);
checkpoint_result.unwrap();
assert_eq!(concurrent_result, 42);
}
#[cfg(feature = "grafeo-file")]
#[tokio::test]
async fn async_write_snapshot_roundtrip() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.grafeo");
let db = Arc::new(GrafeoDB::open(&path).unwrap());
// Insert data
let session = db.session();
session.execute("INSERT (:Person {name: 'Alix'})").unwrap();
drop(session);
// Write snapshot
db.async_write_snapshot().await.unwrap();
// Verify data survives by opening a fresh instance
drop(db);
let db2 = GrafeoDB::open(&path).unwrap();
let session2 = db2.session();
let result = session2.execute("MATCH (p:Person) RETURN p.name").unwrap();
assert_eq!(result.rows.len(), 1);
}
}