Skip to main content

grafeo_engine/database/
admin.rs

1//! Admin, introspection, and diagnostic operations for GrafeoDB.
2
3use std::path::Path;
4
5use grafeo_common::utils::error::Result;
6
7impl super::GrafeoDB {
8    // =========================================================================
9    // ADMIN API: Counts
10    // =========================================================================
11
12    /// Returns the number of nodes in the database.
13    #[must_use]
14    pub fn node_count(&self) -> usize {
15        self.lpg_store().node_count()
16    }
17
18    /// Returns the number of edges in the database.
19    #[must_use]
20    pub fn edge_count(&self) -> usize {
21        self.lpg_store().edge_count()
22    }
23
24    /// Returns the number of distinct labels in the database.
25    #[must_use]
26    pub fn label_count(&self) -> usize {
27        self.lpg_store().label_count()
28    }
29
30    /// Returns the number of distinct property keys in the database.
31    #[must_use]
32    pub fn property_key_count(&self) -> usize {
33        self.lpg_store().property_key_count()
34    }
35
36    /// Returns the number of distinct edge types in the database.
37    #[must_use]
38    pub fn edge_type_count(&self) -> usize {
39        self.lpg_store().edge_type_count()
40    }
41
42    // =========================================================================
43    // ADMIN API: Introspection
44    // =========================================================================
45
46    /// Returns true if this database is backed by a file (persistent).
47    ///
48    /// In-memory databases return false.
49    #[must_use]
50    pub fn is_persistent(&self) -> bool {
51        self.config.path.is_some()
52    }
53
54    /// Returns the database file path, if persistent.
55    ///
56    /// In-memory databases return None.
57    #[must_use]
58    pub fn path(&self) -> Option<&Path> {
59        self.config.path.as_deref()
60    }
61
62    /// Returns high-level database information.
63    ///
64    /// Includes node/edge counts, persistence status, and mode (LPG/RDF).
65    #[must_use]
66    pub fn info(&self) -> crate::admin::DatabaseInfo {
67        crate::admin::DatabaseInfo {
68            mode: crate::admin::DatabaseMode::Lpg,
69            node_count: self.lpg_store().node_count(),
70            edge_count: self.lpg_store().edge_count(),
71            is_persistent: self.is_persistent(),
72            path: self.config.path.clone(),
73            wal_enabled: self.config.wal_enabled,
74            version: env!("CARGO_PKG_VERSION").to_string(),
75            features: {
76                let mut f = vec!["gql".into()];
77                if cfg!(feature = "cypher") {
78                    f.push("cypher".into());
79                }
80                if cfg!(feature = "sparql") {
81                    f.push("sparql".into());
82                }
83                if cfg!(feature = "gremlin") {
84                    f.push("gremlin".into());
85                }
86                if cfg!(feature = "graphql") {
87                    f.push("graphql".into());
88                }
89                if cfg!(feature = "sql-pgq") {
90                    f.push("sql-pgq".into());
91                }
92                if cfg!(feature = "triple-store") {
93                    f.push("rdf".into());
94                }
95                if cfg!(feature = "algos") {
96                    f.push("algos".into());
97                }
98                if cfg!(feature = "vector-index") {
99                    f.push("vector-index".into());
100                }
101                if cfg!(feature = "text-index") {
102                    f.push("text-index".into());
103                }
104                if cfg!(feature = "hybrid-search") {
105                    f.push("hybrid-search".into());
106                }
107                if cfg!(feature = "cdc") {
108                    f.push("cdc".into());
109                }
110                f
111            },
112        }
113    }
114
115    /// Returns a hierarchical memory usage breakdown.
116    ///
117    /// Walks all internal structures (store, indexes, MVCC chains, caches,
118    /// string pools, buffer manager) and returns estimated heap bytes for each.
119    /// Safe to call concurrently with queries.
120    #[must_use]
121    pub fn memory_usage(&self) -> crate::memory_usage::MemoryUsage {
122        use crate::memory_usage::{BufferManagerMemory, CacheMemory, MemoryUsage};
123        use grafeo_common::memory::MemoryRegion;
124
125        let (store, indexes, mvcc, string_pool) = self.lpg_store().memory_breakdown();
126
127        let (parsed_bytes, optimized_bytes, cached_plan_count) =
128            self.query_cache.heap_memory_bytes();
129        let mut caches = CacheMemory {
130            parsed_plan_cache_bytes: parsed_bytes,
131            optimized_plan_cache_bytes: optimized_bytes,
132            cached_plan_count,
133            ..Default::default()
134        };
135        caches.compute_total();
136
137        let bm_stats = self.buffer_manager.stats();
138        let buffer_manager = BufferManagerMemory {
139            budget_bytes: bm_stats.budget,
140            allocated_bytes: bm_stats.total_allocated,
141            graph_storage_bytes: bm_stats.region_usage(MemoryRegion::GraphStorage),
142            index_buffers_bytes: bm_stats.region_usage(MemoryRegion::IndexBuffers),
143            execution_buffers_bytes: bm_stats.region_usage(MemoryRegion::ExecutionBuffers),
144            spill_staging_bytes: bm_stats.region_usage(MemoryRegion::SpillStaging),
145        };
146
147        let mut usage = MemoryUsage {
148            store,
149            indexes,
150            mvcc,
151            caches,
152            string_pool,
153            buffer_manager,
154            ..Default::default()
155        };
156
157        #[cfg(feature = "triple-store")]
158        {
159            use crate::memory_usage::RdfMemory;
160            let (
161                triple_count,
162                triples_and_indexes_bytes,
163                term_dictionary_bytes,
164                ring_index_bytes,
165                named_graph_count,
166            ) = self.rdf_store.heap_memory_bytes();
167            usage.rdf = RdfMemory {
168                triple_count,
169                triples_and_indexes_bytes,
170                term_dictionary_bytes,
171                ring_index_bytes,
172                named_graph_count,
173                total_bytes: 0,
174            };
175            usage.rdf.compute_total();
176        }
177
178        #[cfg(feature = "cdc")]
179        {
180            use crate::memory_usage::CdcMemory;
181            let (total_bytes, entity_count, event_count) = self.cdc_log.heap_memory_bytes();
182            usage.cdc = CdcMemory {
183                total_bytes,
184                entity_count,
185                event_count,
186            };
187        }
188
189        usage.compute_total();
190        usage
191    }
192
193    /// Returns detailed database statistics.
194    ///
195    /// Includes counts, memory usage, and index information.
196    #[must_use]
197    pub fn detailed_stats(&self) -> crate::admin::DatabaseStats {
198        #[cfg(feature = "wal")]
199        let disk_bytes = self.config.path.as_ref().and_then(|p| {
200            if p.exists() {
201                Self::calculate_disk_usage(p).ok()
202            } else {
203                None
204            }
205        });
206        #[cfg(not(feature = "wal"))]
207        let disk_bytes: Option<usize> = None;
208
209        crate::admin::DatabaseStats {
210            node_count: self.lpg_store().node_count(),
211            edge_count: self.lpg_store().edge_count(),
212            label_count: self.lpg_store().label_count(),
213            edge_type_count: self.lpg_store().edge_type_count(),
214            property_key_count: self.lpg_store().property_key_count(),
215            index_count: self.catalog.index_count(),
216            memory_bytes: self.memory_usage().total_bytes,
217            disk_bytes,
218        }
219    }
220
221    /// Calculates total disk usage for the database directory.
222    #[cfg(feature = "wal")]
223    fn calculate_disk_usage(path: &Path) -> Result<usize> {
224        let mut total = 0usize;
225        if path.is_dir() {
226            for entry in std::fs::read_dir(path)? {
227                let entry = entry?;
228                let metadata = entry.metadata()?;
229                if metadata.is_file() {
230                    // reason: file sizes fit usize on 64-bit targets
231                    #[allow(clippy::cast_possible_truncation)]
232                    let file_len = metadata.len() as usize;
233                    total += file_len;
234                } else if metadata.is_dir() {
235                    total += Self::calculate_disk_usage(&entry.path())?;
236                }
237            }
238        }
239        Ok(total)
240    }
241
242    /// Returns schema information (labels, edge types, property keys).
243    ///
244    /// For LPG mode, returns label and edge type information.
245    /// For RDF mode, returns predicate and named graph information.
246    #[must_use]
247    pub fn schema(&self) -> crate::admin::SchemaInfo {
248        let labels = self
249            .lpg_store()
250            .all_labels()
251            .into_iter()
252            .map(|name| crate::admin::LabelInfo {
253                name: name.clone(),
254                count: self.lpg_store().nodes_with_label(&name).count(),
255            })
256            .collect();
257
258        let edge_types = self
259            .lpg_store()
260            .all_edge_types()
261            .into_iter()
262            .map(|name| crate::admin::EdgeTypeInfo {
263                name: name.clone(),
264                count: self.lpg_store().edges_with_type(&name).count(),
265            })
266            .collect();
267
268        let property_keys = self.lpg_store().all_property_keys();
269
270        crate::admin::SchemaInfo::Lpg(crate::admin::LpgSchemaInfo {
271            labels,
272            edge_types,
273            property_keys,
274        })
275    }
276
277    /// Returns detailed information about all indexes.
278    #[must_use]
279    pub fn list_indexes(&self) -> Vec<crate::admin::IndexInfo> {
280        self.catalog
281            .all_indexes()
282            .into_iter()
283            .map(|def| {
284                let label_name = self
285                    .catalog
286                    .get_label_name(def.label)
287                    .unwrap_or_else(|| "?".into());
288                let prop_name = self
289                    .catalog
290                    .get_property_key_name(def.property_key)
291                    .unwrap_or_else(|| "?".into());
292                crate::admin::IndexInfo {
293                    name: format!("idx_{}_{}", label_name, prop_name),
294                    index_type: format!("{:?}", def.index_type),
295                    target: format!("{}:{}", label_name, prop_name),
296                    unique: false,
297                    cardinality: None,
298                    size_bytes: None,
299                }
300            })
301            .collect()
302    }
303
304    /// Validates database integrity.
305    ///
306    /// Checks for:
307    /// - Dangling edge references (edges pointing to non-existent nodes)
308    /// - Internal index consistency
309    ///
310    /// Returns a list of errors and warnings. Empty errors = valid.
311    #[must_use]
312    pub fn validate(&self) -> crate::admin::ValidationResult {
313        let mut result = crate::admin::ValidationResult::default();
314
315        // Check for dangling edge references
316        for edge in self.lpg_store().all_edges() {
317            if self.lpg_store().get_node(edge.src).is_none() {
318                result.errors.push(crate::admin::ValidationError {
319                    code: "DANGLING_SRC".to_string(),
320                    message: format!(
321                        "Edge {} references non-existent source node {}",
322                        edge.id.0, edge.src.0
323                    ),
324                    context: Some(format!("edge:{}", edge.id.0)),
325                });
326            }
327            if self.lpg_store().get_node(edge.dst).is_none() {
328                result.errors.push(crate::admin::ValidationError {
329                    code: "DANGLING_DST".to_string(),
330                    message: format!(
331                        "Edge {} references non-existent destination node {}",
332                        edge.id.0, edge.dst.0
333                    ),
334                    context: Some(format!("edge:{}", edge.id.0)),
335                });
336            }
337        }
338
339        // Add warnings for potential issues
340        if self.lpg_store().node_count() > 0 && self.lpg_store().edge_count() == 0 {
341            result.warnings.push(crate::admin::ValidationWarning {
342                code: "NO_EDGES".to_string(),
343                message: "Database has nodes but no edges".to_string(),
344                context: None,
345            });
346        }
347
348        result
349    }
350
351    /// Returns WAL (Write-Ahead Log) status.
352    ///
353    /// Returns None if WAL is not enabled.
354    #[must_use]
355    pub fn wal_status(&self) -> crate::admin::WalStatus {
356        #[cfg(feature = "wal")]
357        if let Some(ref wal) = self.wal {
358            return crate::admin::WalStatus {
359                enabled: true,
360                path: self.config.path.as_ref().map(|p| p.join("wal")),
361                size_bytes: wal.size_bytes(),
362                // reason: WAL record count fits usize on 64-bit targets
363                #[allow(clippy::cast_possible_truncation)]
364                record_count: wal.record_count() as usize,
365                last_checkpoint: wal.last_checkpoint_timestamp(),
366                current_epoch: self.lpg_store().current_epoch().as_u64(),
367            };
368        }
369
370        crate::admin::WalStatus {
371            enabled: false,
372            path: None,
373            size_bytes: 0,
374            record_count: 0,
375            last_checkpoint: None,
376            current_epoch: self.lpg_store().current_epoch().as_u64(),
377        }
378    }
379
380    /// Forces a WAL checkpoint.
381    ///
382    /// Flushes all pending WAL records to the main storage.
383    ///
384    /// # Errors
385    ///
386    /// Returns an error if the checkpoint fails.
387    pub fn wal_checkpoint(&self) -> Result<()> {
388        // Read-only databases have no WAL and the on-disk file is already a
389        // valid snapshot: nothing to checkpoint.
390        if self.read_only {
391            return Ok(());
392        }
393
394        #[cfg(feature = "wal")]
395        if let Some(ref wal) = self.wal {
396            let epoch = self.lpg_store().current_epoch();
397            let transaction_id = self
398                .transaction_manager
399                .last_assigned_transaction_id()
400                .unwrap_or_else(|| self.transaction_manager.begin());
401            wal.checkpoint(transaction_id, epoch)?;
402            wal.sync()?;
403        }
404
405        // Flush all sections to .grafeo file (explicit checkpoint)
406        #[cfg(feature = "grafeo-file")]
407        if let Some(ref fm) = self.file_manager {
408            let _ = self.checkpoint_to_file(fm, super::flush::FlushReason::Explicit)?;
409        }
410
411        Ok(())
412    }
413
414    // =========================================================================
415    // ADMIN API: Change Data Capture
416    // =========================================================================
417
418    /// Returns whether CDC is enabled by default for new sessions.
419    #[cfg(feature = "cdc")]
420    #[must_use]
421    pub fn is_cdc_enabled(&self) -> bool {
422        self.cdc_active()
423    }
424
425    /// Sets whether CDC is enabled by default for new sessions.
426    ///
427    /// Does not affect sessions that were already created.
428    #[cfg(feature = "cdc")]
429    pub fn set_cdc_enabled(&self, enabled: bool) {
430        self.cdc_enabled
431            .store(enabled, std::sync::atomic::Ordering::Relaxed);
432    }
433
434    /// Returns the full change history for an entity (node or edge).
435    ///
436    /// Events are ordered chronologically by epoch.
437    ///
438    /// # Errors
439    ///
440    /// Returns an error if the CDC feature is not enabled.
441    #[cfg(feature = "cdc")]
442    pub fn history(
443        &self,
444        entity_id: impl Into<crate::cdc::EntityId>,
445    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
446        Ok(self.cdc_log.history(entity_id.into()))
447    }
448
449    /// Returns change events for an entity since the given epoch.
450    ///
451    /// # Errors
452    ///
453    /// Currently infallible, but returns `Result` for forward compatibility.
454    #[cfg(feature = "cdc")]
455    pub fn history_since(
456        &self,
457        entity_id: impl Into<crate::cdc::EntityId>,
458        since_epoch: grafeo_common::types::EpochId,
459    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
460        Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
461    }
462
463    /// Returns all change events across all entities in an epoch range.
464    ///
465    /// # Errors
466    ///
467    /// Currently infallible, but returns `Result` for forward compatibility.
468    #[cfg(feature = "cdc")]
469    pub fn changes_between(
470        &self,
471        start_epoch: grafeo_common::types::EpochId,
472        end_epoch: grafeo_common::types::EpochId,
473    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
474        Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
475    }
476}