Skip to main content

grafeo_engine/database/
admin.rs

1//! Admin, introspection, and diagnostic operations for GrafeoDB.
2
3use std::path::Path;
4
5use grafeo_common::utils::error::Result;
6
7impl super::GrafeoDB {
8    // =========================================================================
9    // ADMIN API: Counts
10    // =========================================================================
11
12    /// Returns the number of nodes in the database.
13    #[must_use]
14    pub fn node_count(&self) -> usize {
15        self.store.node_count()
16    }
17
18    /// Returns the number of edges in the database.
19    #[must_use]
20    pub fn edge_count(&self) -> usize {
21        self.store.edge_count()
22    }
23
24    /// Returns the number of distinct labels in the database.
25    #[must_use]
26    pub fn label_count(&self) -> usize {
27        self.store.label_count()
28    }
29
30    /// Returns the number of distinct property keys in the database.
31    #[must_use]
32    pub fn property_key_count(&self) -> usize {
33        self.store.property_key_count()
34    }
35
36    /// Returns the number of distinct edge types in the database.
37    #[must_use]
38    pub fn edge_type_count(&self) -> usize {
39        self.store.edge_type_count()
40    }
41
42    // =========================================================================
43    // ADMIN API: Introspection
44    // =========================================================================
45
46    /// Returns true if this database is backed by a file (persistent).
47    ///
48    /// In-memory databases return false.
49    #[must_use]
50    pub fn is_persistent(&self) -> bool {
51        self.config.path.is_some()
52    }
53
54    /// Returns the database file path, if persistent.
55    ///
56    /// In-memory databases return None.
57    #[must_use]
58    pub fn path(&self) -> Option<&Path> {
59        self.config.path.as_deref()
60    }
61
62    /// Returns high-level database information.
63    ///
64    /// Includes node/edge counts, persistence status, and mode (LPG/RDF).
65    #[must_use]
66    pub fn info(&self) -> crate::admin::DatabaseInfo {
67        crate::admin::DatabaseInfo {
68            mode: crate::admin::DatabaseMode::Lpg,
69            node_count: self.store.node_count(),
70            edge_count: self.store.edge_count(),
71            is_persistent: self.is_persistent(),
72            path: self.config.path.clone(),
73            wal_enabled: self.config.wal_enabled,
74            version: env!("CARGO_PKG_VERSION").to_string(),
75        }
76    }
77
78    /// Returns a hierarchical memory usage breakdown.
79    ///
80    /// Walks all internal structures (store, indexes, MVCC chains, caches,
81    /// string pools, buffer manager) and returns estimated heap bytes for each.
82    /// Safe to call concurrently with queries.
83    #[must_use]
84    pub fn memory_usage(&self) -> crate::memory_usage::MemoryUsage {
85        use crate::memory_usage::{BufferManagerMemory, CacheMemory, MemoryUsage};
86        use grafeo_common::memory::MemoryRegion;
87
88        let (store, indexes, mvcc, string_pool) = self.store.memory_breakdown();
89
90        let (parsed_bytes, optimized_bytes, cached_plan_count) =
91            self.query_cache.heap_memory_bytes();
92        let mut caches = CacheMemory {
93            parsed_plan_cache_bytes: parsed_bytes,
94            optimized_plan_cache_bytes: optimized_bytes,
95            cached_plan_count,
96            ..Default::default()
97        };
98        caches.compute_total();
99
100        let bm_stats = self.buffer_manager.stats();
101        let buffer_manager = BufferManagerMemory {
102            budget_bytes: bm_stats.budget,
103            allocated_bytes: bm_stats.total_allocated,
104            graph_storage_bytes: bm_stats.region_usage(MemoryRegion::GraphStorage),
105            index_buffers_bytes: bm_stats.region_usage(MemoryRegion::IndexBuffers),
106            execution_buffers_bytes: bm_stats.region_usage(MemoryRegion::ExecutionBuffers),
107            spill_staging_bytes: bm_stats.region_usage(MemoryRegion::SpillStaging),
108        };
109
110        let mut usage = MemoryUsage {
111            store,
112            indexes,
113            mvcc,
114            caches,
115            string_pool,
116            buffer_manager,
117            ..Default::default()
118        };
119        usage.compute_total();
120        usage
121    }
122
123    /// Returns detailed database statistics.
124    ///
125    /// Includes counts, memory usage, and index information.
126    #[must_use]
127    pub fn detailed_stats(&self) -> crate::admin::DatabaseStats {
128        #[cfg(feature = "wal")]
129        let disk_bytes = self.config.path.as_ref().and_then(|p| {
130            if p.exists() {
131                Self::calculate_disk_usage(p).ok()
132            } else {
133                None
134            }
135        });
136        #[cfg(not(feature = "wal"))]
137        let disk_bytes: Option<usize> = None;
138
139        crate::admin::DatabaseStats {
140            node_count: self.store.node_count(),
141            edge_count: self.store.edge_count(),
142            label_count: self.store.label_count(),
143            edge_type_count: self.store.edge_type_count(),
144            property_key_count: self.store.property_key_count(),
145            index_count: self.catalog.index_count(),
146            memory_bytes: self.buffer_manager.allocated(),
147            disk_bytes,
148        }
149    }
150
151    /// Calculates total disk usage for the database directory.
152    #[cfg(feature = "wal")]
153    fn calculate_disk_usage(path: &Path) -> Result<usize> {
154        let mut total = 0usize;
155        if path.is_dir() {
156            for entry in std::fs::read_dir(path)? {
157                let entry = entry?;
158                let metadata = entry.metadata()?;
159                if metadata.is_file() {
160                    total += metadata.len() as usize;
161                } else if metadata.is_dir() {
162                    total += Self::calculate_disk_usage(&entry.path())?;
163                }
164            }
165        }
166        Ok(total)
167    }
168
169    /// Returns schema information (labels, edge types, property keys).
170    ///
171    /// For LPG mode, returns label and edge type information.
172    /// For RDF mode, returns predicate and named graph information.
173    #[must_use]
174    pub fn schema(&self) -> crate::admin::SchemaInfo {
175        let labels = self
176            .store
177            .all_labels()
178            .into_iter()
179            .map(|name| crate::admin::LabelInfo {
180                name: name.clone(),
181                count: self.store.nodes_with_label(&name).count(),
182            })
183            .collect();
184
185        let edge_types = self
186            .store
187            .all_edge_types()
188            .into_iter()
189            .map(|name| crate::admin::EdgeTypeInfo {
190                name: name.clone(),
191                count: self.store.edges_with_type(&name).count(),
192            })
193            .collect();
194
195        let property_keys = self.store.all_property_keys();
196
197        crate::admin::SchemaInfo::Lpg(crate::admin::LpgSchemaInfo {
198            labels,
199            edge_types,
200            property_keys,
201        })
202    }
203
204    /// Returns RDF schema information.
205    ///
206    /// Only available when the RDF feature is enabled.
207    #[cfg(feature = "rdf")]
208    #[must_use]
209    pub fn rdf_schema(&self) -> crate::admin::SchemaInfo {
210        let stats = self.rdf_store.stats();
211
212        let predicates = self
213            .rdf_store
214            .predicates()
215            .into_iter()
216            .map(|predicate| {
217                let count = self.rdf_store.triples_with_predicate(&predicate).len();
218                crate::admin::PredicateInfo {
219                    iri: predicate.to_string(),
220                    count,
221                }
222            })
223            .collect();
224
225        crate::admin::SchemaInfo::Rdf(crate::admin::RdfSchemaInfo {
226            predicates,
227            named_graphs: Vec::new(), // Named graphs not yet implemented in RdfStore
228            subject_count: stats.subject_count,
229            object_count: stats.object_count,
230        })
231    }
232
233    /// Returns detailed information about all indexes.
234    #[must_use]
235    pub fn list_indexes(&self) -> Vec<crate::admin::IndexInfo> {
236        self.catalog
237            .all_indexes()
238            .into_iter()
239            .map(|def| {
240                let label_name = self
241                    .catalog
242                    .get_label_name(def.label)
243                    .unwrap_or_else(|| "?".into());
244                let prop_name = self
245                    .catalog
246                    .get_property_key_name(def.property_key)
247                    .unwrap_or_else(|| "?".into());
248                crate::admin::IndexInfo {
249                    name: format!("idx_{}_{}", label_name, prop_name),
250                    index_type: format!("{:?}", def.index_type),
251                    target: format!("{}:{}", label_name, prop_name),
252                    unique: false,
253                    cardinality: None,
254                    size_bytes: None,
255                }
256            })
257            .collect()
258    }
259
260    /// Validates database integrity.
261    ///
262    /// Checks for:
263    /// - Dangling edge references (edges pointing to non-existent nodes)
264    /// - Internal index consistency
265    ///
266    /// Returns a list of errors and warnings. Empty errors = valid.
267    #[must_use]
268    pub fn validate(&self) -> crate::admin::ValidationResult {
269        let mut result = crate::admin::ValidationResult::default();
270
271        // Check for dangling edge references
272        for edge in self.store.all_edges() {
273            if self.store.get_node(edge.src).is_none() {
274                result.errors.push(crate::admin::ValidationError {
275                    code: "DANGLING_SRC".to_string(),
276                    message: format!(
277                        "Edge {} references non-existent source node {}",
278                        edge.id.0, edge.src.0
279                    ),
280                    context: Some(format!("edge:{}", edge.id.0)),
281                });
282            }
283            if self.store.get_node(edge.dst).is_none() {
284                result.errors.push(crate::admin::ValidationError {
285                    code: "DANGLING_DST".to_string(),
286                    message: format!(
287                        "Edge {} references non-existent destination node {}",
288                        edge.id.0, edge.dst.0
289                    ),
290                    context: Some(format!("edge:{}", edge.id.0)),
291                });
292            }
293        }
294
295        // Add warnings for potential issues
296        if self.store.node_count() > 0 && self.store.edge_count() == 0 {
297            result.warnings.push(crate::admin::ValidationWarning {
298                code: "NO_EDGES".to_string(),
299                message: "Database has nodes but no edges".to_string(),
300                context: None,
301            });
302        }
303
304        result
305    }
306
307    /// Returns WAL (Write-Ahead Log) status.
308    ///
309    /// Returns None if WAL is not enabled.
310    #[must_use]
311    pub fn wal_status(&self) -> crate::admin::WalStatus {
312        #[cfg(feature = "wal")]
313        if let Some(ref wal) = self.wal {
314            return crate::admin::WalStatus {
315                enabled: true,
316                path: self.config.path.as_ref().map(|p| p.join("wal")),
317                size_bytes: wal.size_bytes(),
318                record_count: wal.record_count() as usize,
319                last_checkpoint: wal.last_checkpoint_timestamp(),
320                current_epoch: self.store.current_epoch().as_u64(),
321            };
322        }
323
324        crate::admin::WalStatus {
325            enabled: false,
326            path: None,
327            size_bytes: 0,
328            record_count: 0,
329            last_checkpoint: None,
330            current_epoch: self.store.current_epoch().as_u64(),
331        }
332    }
333
334    /// Forces a WAL checkpoint.
335    ///
336    /// Flushes all pending WAL records to the main storage.
337    ///
338    /// # Errors
339    ///
340    /// Returns an error if the checkpoint fails.
341    pub fn wal_checkpoint(&self) -> Result<()> {
342        #[cfg(feature = "wal")]
343        if let Some(ref wal) = self.wal {
344            let epoch = self.store.current_epoch();
345            let transaction_id = self
346                .transaction_manager
347                .last_assigned_transaction_id()
348                .unwrap_or_else(|| self.transaction_manager.begin());
349            wal.checkpoint(transaction_id, epoch)?;
350            wal.sync()?;
351        }
352
353        // For single-file format: flush snapshot to .grafeo file
354        #[cfg(feature = "grafeo-file")]
355        if let Some(ref fm) = self.file_manager {
356            self.checkpoint_to_file(fm)?;
357        }
358
359        Ok(())
360    }
361
362    // =========================================================================
363    // ADMIN API: Change Data Capture
364    // =========================================================================
365
366    /// Returns the full change history for an entity (node or edge).
367    ///
368    /// Events are ordered chronologically by epoch.
369    ///
370    /// # Errors
371    ///
372    /// Returns an error if the CDC feature is not enabled.
373    #[cfg(feature = "cdc")]
374    pub fn history(
375        &self,
376        entity_id: impl Into<crate::cdc::EntityId>,
377    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
378        Ok(self.cdc_log.history(entity_id.into()))
379    }
380
381    /// Returns change events for an entity since the given epoch.
382    #[cfg(feature = "cdc")]
383    pub fn history_since(
384        &self,
385        entity_id: impl Into<crate::cdc::EntityId>,
386        since_epoch: grafeo_common::types::EpochId,
387    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
388        Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
389    }
390
391    /// Returns all change events across all entities in an epoch range.
392    #[cfg(feature = "cdc")]
393    pub fn changes_between(
394        &self,
395        start_epoch: grafeo_common::types::EpochId,
396        end_epoch: grafeo_common::types::EpochId,
397    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
398        Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
399    }
400}