1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
//! Administrative and test-only helper methods.
//!
//! Provides internal statistics, test visibility, and database maintenance operations.
use crate::core::error::{PersistenceErrorKind, Result, ResultExt, StorageError};
use crate::core::temporal::Timestamp;
use crate::db::AletheiaDB;
use crate::index::temporal::TemporalIndexes;
use crate::query::planner::Statistics;
#[cfg(test)]
use crate::storage::current::CurrentStorage;
use crate::storage::historical::{HistoricalStats, HistoricalStorage};
use crate::storage::index_persistence::operations::{
persist_temporal_index, persist_vector_indexes,
};
use parking_lot::RwLock;
use std::sync::Arc;
impl AletheiaDB {
/// Get statistics about the historical storage.
#[must_use = "the historical statistics value must be used"]
pub fn historical_stats(&self) -> Result<HistoricalStats> {
Ok(self.historical.read().stats())
}
/// Persist all indexes to disk.
///
/// This saves the current state of all indexes (graph, temporal, vector, strings)
/// to disk in the configured persistence directory.
///
/// # Errors
///
/// Returns an error if:
/// - Index persistence is not enabled in configuration
/// - Writing index files fails due to I/O errors
/// - Index serialization fails
///
/// # Example
///
/// ```rust,no_run
/// # use aletheiadb::AletheiaDB;
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let db = AletheiaDB::new()?;
/// // ... add data ...
/// db.persist_indexes()?; // Save indexes to disk
/// # Ok(())
/// # }
/// ```
#[must_use = "this Result must be used; ignoring errors can lead to silent failures"]
pub fn persist_indexes(&self) -> Result<()> {
let result = (|| {
use crate::storage::index_persistence::formats::IndexManifest;
// Warn if background persistence thread has stopped
if self
.persistence_thread_stopped
.load(std::sync::atomic::Ordering::Acquire)
{
eprintln!(
"Warning: Background persistence thread has stopped. \
Automatic persistence is disabled. Manual persist_indexes() calls will still work."
);
}
let manager = self.persistence_manager.as_ref().ok_or_else(|| {
StorageError::InconsistentState {
reason: "Index persistence not enabled".to_string(),
}
})?;
// Capture current LSN for all operations
let current_lsn = self.wal.current_lsn().0;
let tracker = self.persistence_tracker.as_ref();
// String interner must be saved first (dependency for all others).
// Update the string LSN tracker to current_lsn BEFORE calculating safe LSN
// so even if no new strings were added, the tracker reflects current state.
if let Some(tracker) = tracker {
crate::storage::index_persistence::operations::persist_string_interner(
manager,
tracker,
current_lsn,
)?;
} else {
manager.save_string_interner().map_err(|e| {
StorageError::persistence_with_kind(
PersistenceErrorKind::from(&e),
format!("Failed to save string interner: {}", e),
)
})?;
}
crate::storage::index_persistence::operations::persist_graph_index(
&self.current,
manager,
tracker,
current_lsn,
)?;
if let Some(tracker) = tracker {
persist_vector_indexes(&self.current, manager, Some(tracker), current_lsn)?;
persist_temporal_index(
&self.historical,
&self.temporal_indexes,
manager,
tracker,
current_lsn,
)?;
}
// Record WAL position for future replay coordination.
// Safe LSN = min of all components; since we just persisted everything, current_lsn is safe.
let safe_lsn = tracker
.map(|t| t.get_safe_manifest_lsn())
.unwrap_or(current_lsn);
let manifest = IndexManifest::new(safe_lsn);
manager.save_manifest(&manifest).map_err(|e| {
StorageError::persistence_with_kind(
PersistenceErrorKind::from(&e),
format!("Failed to save manifest: {}", e),
)
})?;
Ok(())
})();
result.record_error_metric()
}
/// Get a reference to the current storage (test-only helper).
///
/// This method is only available in test builds and provides access to the
/// internal CurrentStorage for integration test verification purposes.
#[cfg(test)]
#[allow(dead_code)]
pub(crate) fn storage(&self) -> &Arc<CurrentStorage> {
&self.current
}
/// Get the current WAL LSN (test-only helper).
///
/// This method provides access to the current WAL Log Sequence Number for
/// test verification purposes. This is particularly useful for testing index
/// persistence where LSN coordination with the WAL is critical for correctness.
///
/// **Warning**: This method exposes internal implementation details and
/// should only be used in tests.
///
/// # Returns
///
/// The current LSN from the WAL system.
///
/// # Example
///
/// ```rust,no_run
/// # use aletheiadb::{AletheiaDB, PropertyMap};
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let db = AletheiaDB::new()?;
/// # let properties = PropertyMap::new();
/// db.create_node("Person", properties)?;
/// let lsn = db.__test_current_wal_lsn();
/// assert!(lsn > 0); // LSN advances after operations
/// # Ok(())
/// # }
/// ```
#[doc(hidden)]
pub fn __test_current_wal_lsn(&self) -> u64 {
self.wal.current_lsn().0
}
/// Get the current transaction timestamp (test-only helper).
///
/// This method provides access to the internal transaction clock for
/// integration test verification purposes.
#[doc(hidden)]
pub fn __test_current_timestamp(&self) -> Timestamp {
*self.current_timestamp.lock().unwrap()
}
/// Access the internal HistoricalStorage for testing purposes.
///
/// This method provides access to the internal HistoricalStorage for
/// integration test verification purposes. It is public to allow access from
/// integration tests but is hidden from documentation and marked with
/// `__test_` prefix to discourage production use.
///
/// **Warning**: This method exposes internal implementation details and
/// should only be used in tests.
#[doc(hidden)]
pub fn __test_historical_storage(&self) -> &Arc<RwLock<HistoricalStorage>> {
&self.historical
}
/// Provide test-only access to temporal indexes for performance testing.
///
/// This allows tests to verify that temporal indexes are populated correctly
/// and can query them directly. This is marked as `#[doc(hidden)]` and
/// should only be used in tests.
#[doc(hidden)]
pub fn __test_temporal_indexes(&self) -> &Arc<TemporalIndexes> {
&self.temporal_indexes
}
/// Get adaptive over-fetch statistics for a label (test-only helper).
///
/// Returns the current statistics (search_count, total_candidates, total_results)
/// for the given label, or None if no searches have been performed yet.
///
/// This is used for testing to verify that adaptive learning is working correctly.
///
/// **Warning**: This method exposes internal implementation details and
/// should only be used in tests.
///
/// # Returns
///
/// Some((search_count, total_candidates, total_results)) if statistics exist,
/// None otherwise.
///
/// # Example
///
/// ```rust,no_run
/// # use aletheiadb::AletheiaDB;
/// # use aletheiadb::index::vector::HnswConfig;
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let db = AletheiaDB::new()?;
/// # let config = HnswConfig::default();
/// db.vector_index("embedding").hnsw(config).enable()?;
/// // ... create nodes and perform searches ...
/// let (count, candidates, results) = db.__test_get_filter_stats("Person").unwrap();
/// assert_eq!(count, 10); // 10 searches performed
/// # Ok(())
/// # }
/// ```
#[doc(hidden)]
pub fn __test_get_filter_stats(&self, label: &str) -> Option<(u64, u64, u64)> {
self.current.get_filter_stats(label)
}
/// Get the query optimization statistics.
///
/// Statistics are used for cost-based query optimization and are cached
/// across queries for efficiency. The statistics are automatically refreshed
/// when needed, but can be manually refreshed using [`refresh_statistics`](Self::refresh_statistics).
///
/// # Returns
///
/// A reference to the shared statistics object.
pub fn statistics(&self) -> &Arc<Statistics> {
&self.stats
}
/// Refresh query optimization statistics from current storage.
///
/// This collects fresh statistics about node counts, edge counts, label
/// cardinalities, and other metrics used for cost-based query optimization.
/// Call this method after significant schema changes or data modifications
/// to ensure the query planner has accurate information.
///
/// Statistics are automatically refreshed lazily on first query, so this
/// method is typically only needed for benchmarking or after bulk imports.
///
/// # Example
///
/// ```rust,no_run
/// # use aletheiadb::{AletheiaDB, PropertyMap};
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
/// # let db = AletheiaDB::new()?;
/// # let documents: Vec<PropertyMap> = vec![];
/// // After bulk import
/// for props in documents {
/// db.create_node("Document", props)?;
/// }
///
/// // Refresh statistics for optimal query planning
/// db.refresh_statistics();
///
/// // Now queries will use accurate statistics
/// # let query = db.query().build();
/// let results = db.execute_query(query)?;
/// # Ok(())
/// # }
/// ```
pub fn refresh_statistics(&self) {
// Collect statistics from current storage
let node_count = self.current.node_count();
let edge_count = self.current.edge_count();
let vector_count = self.current.vector_count();
// Collect label counts from current storage
let label_counts = self.current.label_counts();
// Calculate average delta chain length from historical storage
// This is used for cost estimation of temporal lookups.
let historical_stats = self.historical.read().stats();
let total_versions =
historical_stats.total_node_versions + historical_stats.total_edge_versions;
let total_anchors = historical_stats.node_anchor_count + historical_stats.edge_anchor_count;
let avg_delta_chain = if total_anchors > 0 {
let interval = total_versions as f64 / total_anchors as f64;
// Average reconstruction depth is roughly (interval - 1) / 2
(interval - 1.0) / 2.0
} else {
// Default estimate if historical storage is empty or has no anchors
// (Assumes anchor_interval=10, so avg depth ~5)
5.0
};
self.stats.refresh(
node_count,
edge_count,
vector_count,
label_counts,
avg_delta_chain,
);
}
/// Invalidate cached query optimization statistics.
///
/// Call this after schema changes to force re-collection of statistics
/// on the next query. The statistics will be lazily refreshed when needed.
pub fn invalidate_statistics(&self) {
self.stats.invalidate();
}
}