Skip to main content

grafeo_engine/database/
admin.rs

1//! Admin, introspection, and diagnostic operations for GrafeoDB.
2
3use std::path::Path;
4
5use grafeo_common::utils::error::Result;
6
7impl super::GrafeoDB {
8    // =========================================================================
9    // ADMIN API: Counts
10    // =========================================================================
11
12    /// Returns the number of nodes in the database.
13    #[must_use]
14    pub fn node_count(&self) -> usize {
15        self.store.node_count()
16    }
17
18    /// Returns the number of edges in the database.
19    #[must_use]
20    pub fn edge_count(&self) -> usize {
21        self.store.edge_count()
22    }
23
24    /// Returns the number of distinct labels in the database.
25    #[must_use]
26    pub fn label_count(&self) -> usize {
27        self.store.label_count()
28    }
29
30    /// Returns the number of distinct property keys in the database.
31    #[must_use]
32    pub fn property_key_count(&self) -> usize {
33        self.store.property_key_count()
34    }
35
36    /// Returns the number of distinct edge types in the database.
37    #[must_use]
38    pub fn edge_type_count(&self) -> usize {
39        self.store.edge_type_count()
40    }
41
42    // =========================================================================
43    // ADMIN API: Introspection
44    // =========================================================================
45
46    /// Returns true if this database is backed by a file (persistent).
47    ///
48    /// In-memory databases return false.
49    #[must_use]
50    pub fn is_persistent(&self) -> bool {
51        self.config.path.is_some()
52    }
53
54    /// Returns the database file path, if persistent.
55    ///
56    /// In-memory databases return None.
57    #[must_use]
58    pub fn path(&self) -> Option<&Path> {
59        self.config.path.as_deref()
60    }
61
62    /// Returns high-level database information.
63    ///
64    /// Includes node/edge counts, persistence status, and mode (LPG/RDF).
65    #[must_use]
66    pub fn info(&self) -> crate::admin::DatabaseInfo {
67        crate::admin::DatabaseInfo {
68            mode: crate::admin::DatabaseMode::Lpg,
69            node_count: self.store.node_count(),
70            edge_count: self.store.edge_count(),
71            is_persistent: self.is_persistent(),
72            path: self.config.path.clone(),
73            wal_enabled: self.config.wal_enabled,
74            version: env!("CARGO_PKG_VERSION").to_string(),
75        }
76    }
77
78    /// Returns detailed database statistics.
79    ///
80    /// Includes counts, memory usage, and index information.
81    #[must_use]
82    pub fn detailed_stats(&self) -> crate::admin::DatabaseStats {
83        #[cfg(feature = "wal")]
84        let disk_bytes = self.config.path.as_ref().and_then(|p| {
85            if p.exists() {
86                Self::calculate_disk_usage(p).ok()
87            } else {
88                None
89            }
90        });
91        #[cfg(not(feature = "wal"))]
92        let disk_bytes: Option<usize> = None;
93
94        crate::admin::DatabaseStats {
95            node_count: self.store.node_count(),
96            edge_count: self.store.edge_count(),
97            label_count: self.store.label_count(),
98            edge_type_count: self.store.edge_type_count(),
99            property_key_count: self.store.property_key_count(),
100            index_count: self.catalog.index_count(),
101            memory_bytes: self.buffer_manager.allocated(),
102            disk_bytes,
103        }
104    }
105
106    /// Calculates total disk usage for the database directory.
107    #[cfg(feature = "wal")]
108    fn calculate_disk_usage(path: &Path) -> Result<usize> {
109        let mut total = 0usize;
110        if path.is_dir() {
111            for entry in std::fs::read_dir(path)? {
112                let entry = entry?;
113                let metadata = entry.metadata()?;
114                if metadata.is_file() {
115                    total += metadata.len() as usize;
116                } else if metadata.is_dir() {
117                    total += Self::calculate_disk_usage(&entry.path())?;
118                }
119            }
120        }
121        Ok(total)
122    }
123
124    /// Returns schema information (labels, edge types, property keys).
125    ///
126    /// For LPG mode, returns label and edge type information.
127    /// For RDF mode, returns predicate and named graph information.
128    #[must_use]
129    pub fn schema(&self) -> crate::admin::SchemaInfo {
130        let labels = self
131            .store
132            .all_labels()
133            .into_iter()
134            .map(|name| crate::admin::LabelInfo {
135                name: name.clone(),
136                count: self.store.nodes_with_label(&name).count(),
137            })
138            .collect();
139
140        let edge_types = self
141            .store
142            .all_edge_types()
143            .into_iter()
144            .map(|name| crate::admin::EdgeTypeInfo {
145                name: name.clone(),
146                count: self.store.edges_with_type(&name).count(),
147            })
148            .collect();
149
150        let property_keys = self.store.all_property_keys();
151
152        crate::admin::SchemaInfo::Lpg(crate::admin::LpgSchemaInfo {
153            labels,
154            edge_types,
155            property_keys,
156        })
157    }
158
159    /// Returns RDF schema information.
160    ///
161    /// Only available when the RDF feature is enabled.
162    #[cfg(feature = "rdf")]
163    #[must_use]
164    pub fn rdf_schema(&self) -> crate::admin::SchemaInfo {
165        let stats = self.rdf_store.stats();
166
167        let predicates = self
168            .rdf_store
169            .predicates()
170            .into_iter()
171            .map(|predicate| {
172                let count = self.rdf_store.triples_with_predicate(&predicate).len();
173                crate::admin::PredicateInfo {
174                    iri: predicate.to_string(),
175                    count,
176                }
177            })
178            .collect();
179
180        crate::admin::SchemaInfo::Rdf(crate::admin::RdfSchemaInfo {
181            predicates,
182            named_graphs: Vec::new(), // Named graphs not yet implemented in RdfStore
183            subject_count: stats.subject_count,
184            object_count: stats.object_count,
185        })
186    }
187
188    /// Returns detailed information about all indexes.
189    #[must_use]
190    pub fn list_indexes(&self) -> Vec<crate::admin::IndexInfo> {
191        self.catalog
192            .all_indexes()
193            .into_iter()
194            .map(|def| {
195                let label_name = self
196                    .catalog
197                    .get_label_name(def.label)
198                    .unwrap_or_else(|| "?".into());
199                let prop_name = self
200                    .catalog
201                    .get_property_key_name(def.property_key)
202                    .unwrap_or_else(|| "?".into());
203                crate::admin::IndexInfo {
204                    name: format!("idx_{}_{}", label_name, prop_name),
205                    index_type: format!("{:?}", def.index_type),
206                    target: format!("{}:{}", label_name, prop_name),
207                    unique: false,
208                    cardinality: None,
209                    size_bytes: None,
210                }
211            })
212            .collect()
213    }
214
215    /// Validates database integrity.
216    ///
217    /// Checks for:
218    /// - Dangling edge references (edges pointing to non-existent nodes)
219    /// - Internal index consistency
220    ///
221    /// Returns a list of errors and warnings. Empty errors = valid.
222    #[must_use]
223    pub fn validate(&self) -> crate::admin::ValidationResult {
224        let mut result = crate::admin::ValidationResult::default();
225
226        // Check for dangling edge references
227        for edge in self.store.all_edges() {
228            if self.store.get_node(edge.src).is_none() {
229                result.errors.push(crate::admin::ValidationError {
230                    code: "DANGLING_SRC".to_string(),
231                    message: format!(
232                        "Edge {} references non-existent source node {}",
233                        edge.id.0, edge.src.0
234                    ),
235                    context: Some(format!("edge:{}", edge.id.0)),
236                });
237            }
238            if self.store.get_node(edge.dst).is_none() {
239                result.errors.push(crate::admin::ValidationError {
240                    code: "DANGLING_DST".to_string(),
241                    message: format!(
242                        "Edge {} references non-existent destination node {}",
243                        edge.id.0, edge.dst.0
244                    ),
245                    context: Some(format!("edge:{}", edge.id.0)),
246                });
247            }
248        }
249
250        // Add warnings for potential issues
251        if self.store.node_count() > 0 && self.store.edge_count() == 0 {
252            result.warnings.push(crate::admin::ValidationWarning {
253                code: "NO_EDGES".to_string(),
254                message: "Database has nodes but no edges".to_string(),
255                context: None,
256            });
257        }
258
259        result
260    }
261
262    /// Returns WAL (Write-Ahead Log) status.
263    ///
264    /// Returns None if WAL is not enabled.
265    #[must_use]
266    pub fn wal_status(&self) -> crate::admin::WalStatus {
267        #[cfg(feature = "wal")]
268        if let Some(ref wal) = self.wal {
269            return crate::admin::WalStatus {
270                enabled: true,
271                path: self.config.path.as_ref().map(|p| p.join("wal")),
272                size_bytes: wal.size_bytes(),
273                record_count: wal.record_count() as usize,
274                last_checkpoint: wal.last_checkpoint_timestamp(),
275                current_epoch: self.store.current_epoch().as_u64(),
276            };
277        }
278
279        crate::admin::WalStatus {
280            enabled: false,
281            path: None,
282            size_bytes: 0,
283            record_count: 0,
284            last_checkpoint: None,
285            current_epoch: self.store.current_epoch().as_u64(),
286        }
287    }
288
289    /// Forces a WAL checkpoint.
290    ///
291    /// Flushes all pending WAL records to the main storage.
292    ///
293    /// # Errors
294    ///
295    /// Returns an error if the checkpoint fails.
296    pub fn wal_checkpoint(&self) -> Result<()> {
297        #[cfg(feature = "wal")]
298        if let Some(ref wal) = self.wal {
299            let epoch = self.store.current_epoch();
300            let tx_id = self
301                .tx_manager
302                .last_assigned_tx_id()
303                .unwrap_or_else(|| self.tx_manager.begin());
304            wal.checkpoint(tx_id, epoch)?;
305            wal.sync()?;
306        }
307        Ok(())
308    }
309
310    // =========================================================================
311    // ADMIN API: Change Data Capture
312    // =========================================================================
313
314    /// Returns the full change history for an entity (node or edge).
315    ///
316    /// Events are ordered chronologically by epoch.
317    ///
318    /// # Errors
319    ///
320    /// Returns an error if the CDC feature is not enabled.
321    #[cfg(feature = "cdc")]
322    pub fn history(
323        &self,
324        entity_id: impl Into<crate::cdc::EntityId>,
325    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
326        Ok(self.cdc_log.history(entity_id.into()))
327    }
328
329    /// Returns change events for an entity since the given epoch.
330    #[cfg(feature = "cdc")]
331    pub fn history_since(
332        &self,
333        entity_id: impl Into<crate::cdc::EntityId>,
334        since_epoch: grafeo_common::types::EpochId,
335    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
336        Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
337    }
338
339    /// Returns all change events across all entities in an epoch range.
340    #[cfg(feature = "cdc")]
341    pub fn changes_between(
342        &self,
343        start_epoch: grafeo_common::types::EpochId,
344        end_epoch: grafeo_common::types::EpochId,
345    ) -> Result<Vec<crate::cdc::ChangeEvent>> {
346        Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
347    }
348}