1use std::path::Path;
4
5use grafeo_common::utils::error::Result;
6
7impl super::GrafeoDB {
8 #[must_use]
14 pub fn node_count(&self) -> usize {
15 self.lpg_store().node_count()
16 }
17
18 #[must_use]
20 pub fn edge_count(&self) -> usize {
21 self.lpg_store().edge_count()
22 }
23
24 #[must_use]
26 pub fn label_count(&self) -> usize {
27 self.lpg_store().label_count()
28 }
29
30 #[must_use]
32 pub fn property_key_count(&self) -> usize {
33 self.lpg_store().property_key_count()
34 }
35
36 #[must_use]
38 pub fn edge_type_count(&self) -> usize {
39 self.lpg_store().edge_type_count()
40 }
41
42 #[must_use]
50 pub fn is_persistent(&self) -> bool {
51 self.config.path.is_some()
52 }
53
54 #[must_use]
58 pub fn path(&self) -> Option<&Path> {
59 self.config.path.as_deref()
60 }
61
62 #[must_use]
66 pub fn info(&self) -> crate::admin::DatabaseInfo {
67 crate::admin::DatabaseInfo {
68 mode: crate::admin::DatabaseMode::Lpg,
69 node_count: self.lpg_store().node_count(),
70 edge_count: self.lpg_store().edge_count(),
71 is_persistent: self.is_persistent(),
72 path: self.config.path.clone(),
73 wal_enabled: self.config.wal_enabled,
74 version: env!("CARGO_PKG_VERSION").to_string(),
75 features: {
76 let mut f = vec!["gql".into()];
77 if cfg!(feature = "cypher") {
78 f.push("cypher".into());
79 }
80 if cfg!(feature = "sparql") {
81 f.push("sparql".into());
82 }
83 if cfg!(feature = "gremlin") {
84 f.push("gremlin".into());
85 }
86 if cfg!(feature = "graphql") {
87 f.push("graphql".into());
88 }
89 if cfg!(feature = "sql-pgq") {
90 f.push("sql-pgq".into());
91 }
92 if cfg!(feature = "triple-store") {
93 f.push("rdf".into());
94 }
95 if cfg!(feature = "algos") {
96 f.push("algos".into());
97 }
98 if cfg!(feature = "vector-index") {
99 f.push("vector-index".into());
100 }
101 if cfg!(feature = "text-index") {
102 f.push("text-index".into());
103 }
104 if cfg!(feature = "hybrid-search") {
105 f.push("hybrid-search".into());
106 }
107 if cfg!(feature = "cdc") {
108 f.push("cdc".into());
109 }
110 f
111 },
112 }
113 }
114
115 #[must_use]
121 pub fn memory_usage(&self) -> crate::memory_usage::MemoryUsage {
122 use crate::memory_usage::{BufferManagerMemory, CacheMemory, MemoryUsage};
123 use grafeo_common::memory::MemoryRegion;
124
125 let (store, indexes, mvcc, string_pool) = self.lpg_store().memory_breakdown();
126
127 let (parsed_bytes, optimized_bytes, cached_plan_count) =
128 self.query_cache.heap_memory_bytes();
129 let mut caches = CacheMemory {
130 parsed_plan_cache_bytes: parsed_bytes,
131 optimized_plan_cache_bytes: optimized_bytes,
132 cached_plan_count,
133 ..Default::default()
134 };
135 caches.compute_total();
136
137 let bm_stats = self.buffer_manager.stats();
138 let buffer_manager = BufferManagerMemory {
139 budget_bytes: bm_stats.budget,
140 allocated_bytes: bm_stats.total_allocated,
141 graph_storage_bytes: bm_stats.region_usage(MemoryRegion::GraphStorage),
142 index_buffers_bytes: bm_stats.region_usage(MemoryRegion::IndexBuffers),
143 execution_buffers_bytes: bm_stats.region_usage(MemoryRegion::ExecutionBuffers),
144 spill_staging_bytes: bm_stats.region_usage(MemoryRegion::SpillStaging),
145 };
146
147 let mut usage = MemoryUsage {
148 store,
149 indexes,
150 mvcc,
151 caches,
152 string_pool,
153 buffer_manager,
154 ..Default::default()
155 };
156 usage.compute_total();
157 usage
158 }
159
160 #[must_use]
164 pub fn detailed_stats(&self) -> crate::admin::DatabaseStats {
165 #[cfg(feature = "wal")]
166 let disk_bytes = self.config.path.as_ref().and_then(|p| {
167 if p.exists() {
168 Self::calculate_disk_usage(p).ok()
169 } else {
170 None
171 }
172 });
173 #[cfg(not(feature = "wal"))]
174 let disk_bytes: Option<usize> = None;
175
176 crate::admin::DatabaseStats {
177 node_count: self.lpg_store().node_count(),
178 edge_count: self.lpg_store().edge_count(),
179 label_count: self.lpg_store().label_count(),
180 edge_type_count: self.lpg_store().edge_type_count(),
181 property_key_count: self.lpg_store().property_key_count(),
182 index_count: self.catalog.index_count(),
183 memory_bytes: self.buffer_manager.allocated(),
184 disk_bytes,
185 }
186 }
187
188 #[cfg(feature = "wal")]
190 fn calculate_disk_usage(path: &Path) -> Result<usize> {
191 let mut total = 0usize;
192 if path.is_dir() {
193 for entry in std::fs::read_dir(path)? {
194 let entry = entry?;
195 let metadata = entry.metadata()?;
196 if metadata.is_file() {
197 total += metadata.len() as usize;
198 } else if metadata.is_dir() {
199 total += Self::calculate_disk_usage(&entry.path())?;
200 }
201 }
202 }
203 Ok(total)
204 }
205
206 #[must_use]
211 pub fn schema(&self) -> crate::admin::SchemaInfo {
212 let labels = self
213 .lpg_store()
214 .all_labels()
215 .into_iter()
216 .map(|name| crate::admin::LabelInfo {
217 name: name.clone(),
218 count: self.lpg_store().nodes_with_label(&name).count(),
219 })
220 .collect();
221
222 let edge_types = self
223 .lpg_store()
224 .all_edge_types()
225 .into_iter()
226 .map(|name| crate::admin::EdgeTypeInfo {
227 name: name.clone(),
228 count: self.lpg_store().edges_with_type(&name).count(),
229 })
230 .collect();
231
232 let property_keys = self.lpg_store().all_property_keys();
233
234 crate::admin::SchemaInfo::Lpg(crate::admin::LpgSchemaInfo {
235 labels,
236 edge_types,
237 property_keys,
238 })
239 }
240
241 #[must_use]
243 pub fn list_indexes(&self) -> Vec<crate::admin::IndexInfo> {
244 self.catalog
245 .all_indexes()
246 .into_iter()
247 .map(|def| {
248 let label_name = self
249 .catalog
250 .get_label_name(def.label)
251 .unwrap_or_else(|| "?".into());
252 let prop_name = self
253 .catalog
254 .get_property_key_name(def.property_key)
255 .unwrap_or_else(|| "?".into());
256 crate::admin::IndexInfo {
257 name: format!("idx_{}_{}", label_name, prop_name),
258 index_type: format!("{:?}", def.index_type),
259 target: format!("{}:{}", label_name, prop_name),
260 unique: false,
261 cardinality: None,
262 size_bytes: None,
263 }
264 })
265 .collect()
266 }
267
268 #[must_use]
276 pub fn validate(&self) -> crate::admin::ValidationResult {
277 let mut result = crate::admin::ValidationResult::default();
278
279 for edge in self.lpg_store().all_edges() {
281 if self.lpg_store().get_node(edge.src).is_none() {
282 result.errors.push(crate::admin::ValidationError {
283 code: "DANGLING_SRC".to_string(),
284 message: format!(
285 "Edge {} references non-existent source node {}",
286 edge.id.0, edge.src.0
287 ),
288 context: Some(format!("edge:{}", edge.id.0)),
289 });
290 }
291 if self.lpg_store().get_node(edge.dst).is_none() {
292 result.errors.push(crate::admin::ValidationError {
293 code: "DANGLING_DST".to_string(),
294 message: format!(
295 "Edge {} references non-existent destination node {}",
296 edge.id.0, edge.dst.0
297 ),
298 context: Some(format!("edge:{}", edge.id.0)),
299 });
300 }
301 }
302
303 if self.lpg_store().node_count() > 0 && self.lpg_store().edge_count() == 0 {
305 result.warnings.push(crate::admin::ValidationWarning {
306 code: "NO_EDGES".to_string(),
307 message: "Database has nodes but no edges".to_string(),
308 context: None,
309 });
310 }
311
312 result
313 }
314
315 #[must_use]
319 pub fn wal_status(&self) -> crate::admin::WalStatus {
320 #[cfg(feature = "wal")]
321 if let Some(ref wal) = self.wal {
322 return crate::admin::WalStatus {
323 enabled: true,
324 path: self.config.path.as_ref().map(|p| p.join("wal")),
325 size_bytes: wal.size_bytes(),
326 record_count: wal.record_count() as usize,
327 last_checkpoint: wal.last_checkpoint_timestamp(),
328 current_epoch: self.lpg_store().current_epoch().as_u64(),
329 };
330 }
331
332 crate::admin::WalStatus {
333 enabled: false,
334 path: None,
335 size_bytes: 0,
336 record_count: 0,
337 last_checkpoint: None,
338 current_epoch: self.lpg_store().current_epoch().as_u64(),
339 }
340 }
341
342 pub fn wal_checkpoint(&self) -> Result<()> {
350 #[cfg(feature = "wal")]
351 if let Some(ref wal) = self.wal {
352 let epoch = self.lpg_store().current_epoch();
353 let transaction_id = self
354 .transaction_manager
355 .last_assigned_transaction_id()
356 .unwrap_or_else(|| self.transaction_manager.begin());
357 wal.checkpoint(transaction_id, epoch)?;
358 wal.sync()?;
359 }
360
361 #[cfg(feature = "grafeo-file")]
363 if let Some(ref fm) = self.file_manager {
364 let _ = self.checkpoint_to_file(fm, super::flush::FlushReason::Explicit)?;
365 }
366
367 Ok(())
368 }
369
370 #[cfg(feature = "cdc")]
376 #[must_use]
377 pub fn is_cdc_enabled(&self) -> bool {
378 self.cdc_active()
379 }
380
381 #[cfg(feature = "cdc")]
385 pub fn set_cdc_enabled(&self, enabled: bool) {
386 self.cdc_enabled
387 .store(enabled, std::sync::atomic::Ordering::Relaxed);
388 }
389
390 #[cfg(feature = "cdc")]
398 pub fn history(
399 &self,
400 entity_id: impl Into<crate::cdc::EntityId>,
401 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
402 Ok(self.cdc_log.history(entity_id.into()))
403 }
404
405 #[cfg(feature = "cdc")]
411 pub fn history_since(
412 &self,
413 entity_id: impl Into<crate::cdc::EntityId>,
414 since_epoch: grafeo_common::types::EpochId,
415 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
416 Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
417 }
418
419 #[cfg(feature = "cdc")]
425 pub fn changes_between(
426 &self,
427 start_epoch: grafeo_common::types::EpochId,
428 end_epoch: grafeo_common::types::EpochId,
429 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
430 Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
431 }
432}