1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
// SPDX-License-Identifier: Apache-2.0 OR MIT
//! Plan 18 step 14 — operational APIs for SQLite-backed stores.
//!
//! Surfaces SQLite primitives (`VACUUM`, `incremental_vacuum`, `ANALYZE`,
//! `wal_checkpoint(TRUNCATE)`, `VACUUM INTO`) and audit-log retention through
//! the public [`Memory`] handle. Each call routes through the configured
//! [`crate::metadata::MetadataStore`]; non-SQLite back-ends short-circuit with
//! [`Error::Config`].
//!
//! These APIs are additive — no existing public surface changes.
use std::path::Path;
use std::time::{Duration, Instant};
use serde::{Deserialize, Serialize};
use sqlx::Row;
use crate::error::{Error, Result};
use crate::handle::Memory;
/// Knobs for [`Memory::vacuum`].
#[non_exhaustive]
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(default)]
pub struct VacuumOpts {
/// When set, run `PRAGMA incremental_vacuum(<max_pages>)` instead of a
/// full `VACUUM`. Useful for live stores where holding the write lock for
/// a full rebuild is unacceptable.
pub max_pages: Option<u32>,
/// When `true`, also run `INSERT INTO memory_fts(memory_fts) VALUES('optimize')`
/// (and the same on `summary_fts` if present) to compact the FTS5 segments.
/// No-op when the tables don't exist yet.
pub include_fts: bool,
}
/// Outcome of [`Memory::vacuum`].
#[non_exhaustive]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VacuumReport {
/// Pages reclaimed (best-effort — derived from `freelist_count` before/after).
pub pages_reclaimed: u64,
/// Bytes freed (`pages_reclaimed * page_size`, best-effort).
pub bytes_freed: u64,
/// Wall-clock duration in milliseconds.
pub duration_ms: u64,
}
/// Outcome of [`Memory::backup_to`].
#[non_exhaustive]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BackupReport {
/// Total bytes copied to the destination.
pub bytes_copied: u64,
/// Wall-clock duration in milliseconds.
pub duration_ms: u64,
}
/// Outcome of [`Memory::checkpoint`].
#[non_exhaustive]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CheckpointReport {
/// Number of WAL pages written back to the main DB.
pub pages_written: u64,
/// Number of WAL frames truncated.
pub frames_truncated: u32,
}
impl Memory {
fn pool(&self) -> Result<&sqlx::SqlitePool> {
self.inner.metadata.sqlite_pool().ok_or_else(|| {
Error::Config("operational APIs require a SQLite metadata back-end".into())
})
}
/// Compact the underlying SQLite database.
///
/// With `opts.max_pages = None` this runs the blocking `VACUUM` statement,
/// which rebuilds the entire DB and reclaims every freelist page; with
/// `Some(n)` it runs `PRAGMA incremental_vacuum(n)` which only frees up
/// to `n` pages and is safe to run on a live store. Non-SQLite back-ends
/// return [`Error::Config`].
pub async fn vacuum(&self, opts: VacuumOpts) -> Result<VacuumReport> {
let pool = self.pool()?;
let started = Instant::now();
let page_size: u64 = sqlx::query_scalar::<_, i64>("PRAGMA page_size")
.fetch_one(pool)
.await
.map_err(|e| Error::metadata("ops.vacuum.page_size", e))?
.max(0) as u64;
let pages_before: u64 = sqlx::query_scalar::<_, i64>("PRAGMA freelist_count")
.fetch_one(pool)
.await
.map_err(|e| Error::metadata("ops.vacuum.freelist_before", e))?
.max(0) as u64;
match opts.max_pages {
None => {
sqlx::query("VACUUM")
.execute(pool)
.await
.map_err(|e| Error::metadata("ops.vacuum.full", e))?;
}
Some(n) => {
let stmt = format!("PRAGMA incremental_vacuum({n})");
sqlx::query(&stmt)
.execute(pool)
.await
.map_err(|e| Error::metadata("ops.vacuum.incremental", e))?;
}
}
if opts.include_fts {
// Plan 18: `create_indices_if_missing` is called from
// `Builder::open` on every fresh handle once `embedder_dims`
// is known, so the FTS5 tables exist for any store that has
// either an embedder bound or one prior append. The early-life
// caller-provided + no-append-yet case is the only window where
// they may not exist; tolerate just that. Every other sqlx
// error (I/O, corruption) propagates.
for stmt in [
"INSERT INTO memory_fts(memory_fts) VALUES('optimize')",
"INSERT INTO summary_fts(summary_fts) VALUES('optimize')",
] {
if let Err(e) = sqlx::query(stmt).execute(pool).await {
let msg = e.to_string();
let is_missing_table = match &e {
sqlx::Error::Database(db) => db
.message()
.to_ascii_lowercase()
.starts_with("no such table"),
_ => false,
} || msg.to_ascii_lowercase().contains("no such table");
if !is_missing_table {
return Err(Error::metadata("ops.vacuum.fts_optimize", e));
}
}
}
}
let pages_after: u64 = sqlx::query_scalar::<_, i64>("PRAGMA freelist_count")
.fetch_one(pool)
.await
.map_err(|e| Error::metadata("ops.vacuum.freelist_after", e))?
.max(0) as u64;
let pages_reclaimed = pages_before.saturating_sub(pages_after);
Ok(VacuumReport {
pages_reclaimed,
bytes_freed: pages_reclaimed.saturating_mul(page_size),
duration_ms: u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX),
})
}
/// Run `ANALYZE` so the SQLite query planner has fresh statistics.
pub async fn analyze(&self) -> Result<()> {
let pool = self.pool()?;
sqlx::query("ANALYZE")
.execute(pool)
.await
.map_err(|e| Error::metadata("ops.analyze", e))?;
Ok(())
}
/// Copy the live database to `path` using `VACUUM INTO`. The destination
/// MUST NOT exist; SQLite errors otherwise. Holds a read lock on the
/// source for the copy duration but does not block readers.
pub async fn backup_to(&self, path: &Path) -> Result<BackupReport> {
let pool = self.pool()?;
let started = Instant::now();
// Quote per SQLite literal rules.
let path_str = path.to_string_lossy().replace('\'', "''");
let stmt = format!("VACUUM INTO '{path_str}'");
sqlx::query(&stmt)
.execute(pool)
.await
.map_err(|e| Error::metadata("ops.backup_to", e))?;
let bytes_copied = std::fs::metadata(path).map(|m| m.len()).unwrap_or(0);
Ok(BackupReport {
bytes_copied,
duration_ms: u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX),
})
}
/// Run `PRAGMA wal_checkpoint(TRUNCATE)` to flush the WAL into the main
/// DB and shrink the WAL file to zero. Returns the number of pages
/// written and frames truncated.
pub async fn checkpoint(&self) -> Result<CheckpointReport> {
let pool = self.pool()?;
// wal_checkpoint returns three integers: busy, log, checkpointed.
let row = sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)")
.fetch_one(pool)
.await
.map_err(|e| Error::metadata("ops.checkpoint", e))?;
// Column 1 is "log" (frames in WAL), column 2 is "checkpointed" (frames written).
let log: i64 = row.try_get(1).unwrap_or(0);
let checkpointed: i64 = row.try_get(2).unwrap_or(0);
Ok(CheckpointReport {
pages_written: u64::try_from(checkpointed).unwrap_or(0),
frames_truncated: u32::try_from(log).unwrap_or(0),
})
}
/// Delete audit-log entries older than `retain`. Returns the number of
/// rows removed.
pub async fn compact_audit_log(&self, retain: Duration) -> Result<u64> {
let pool = self.pool()?;
let now_ms = self.inner.clock.now_ms();
let cutoff = now_ms.saturating_sub(i64::try_from(retain.as_millis()).unwrap_or(i64::MAX));
let res = sqlx::query("DELETE FROM audit_log WHERE ts < ?")
.bind(cutoff)
.execute(pool)
.await
.map_err(|e| Error::metadata("ops.compact_audit_log", e))?;
Ok(res.rows_affected())
}
}