1use std::path::Path;
4
5use grafeo_common::utils::error::Result;
6
7impl super::GrafeoDB {
8 #[must_use]
14 pub fn node_count(&self) -> usize {
15 self.lpg_store().node_count()
16 }
17
18 #[must_use]
20 pub fn edge_count(&self) -> usize {
21 self.lpg_store().edge_count()
22 }
23
24 #[must_use]
26 pub fn label_count(&self) -> usize {
27 self.lpg_store().label_count()
28 }
29
30 #[must_use]
32 pub fn property_key_count(&self) -> usize {
33 self.lpg_store().property_key_count()
34 }
35
36 #[must_use]
38 pub fn edge_type_count(&self) -> usize {
39 self.lpg_store().edge_type_count()
40 }
41
42 #[must_use]
50 pub fn is_persistent(&self) -> bool {
51 self.config.path.is_some()
52 }
53
54 #[must_use]
58 pub fn path(&self) -> Option<&Path> {
59 self.config.path.as_deref()
60 }
61
62 #[must_use]
66 pub fn info(&self) -> crate::admin::DatabaseInfo {
67 crate::admin::DatabaseInfo {
68 mode: crate::admin::DatabaseMode::Lpg,
69 node_count: self.lpg_store().node_count(),
70 edge_count: self.lpg_store().edge_count(),
71 is_persistent: self.is_persistent(),
72 path: self.config.path.clone(),
73 wal_enabled: self.config.wal_enabled,
74 version: env!("CARGO_PKG_VERSION").to_string(),
75 features: {
76 let mut f = vec!["gql".into()];
77 if cfg!(feature = "cypher") {
78 f.push("cypher".into());
79 }
80 if cfg!(feature = "sparql") {
81 f.push("sparql".into());
82 }
83 if cfg!(feature = "gremlin") {
84 f.push("gremlin".into());
85 }
86 if cfg!(feature = "graphql") {
87 f.push("graphql".into());
88 }
89 if cfg!(feature = "sql-pgq") {
90 f.push("sql-pgq".into());
91 }
92 if cfg!(feature = "triple-store") {
93 f.push("rdf".into());
94 }
95 if cfg!(feature = "algos") {
96 f.push("algos".into());
97 }
98 if cfg!(feature = "vector-index") {
99 f.push("vector-index".into());
100 }
101 if cfg!(feature = "text-index") {
102 f.push("text-index".into());
103 }
104 if cfg!(feature = "hybrid-search") {
105 f.push("hybrid-search".into());
106 }
107 if cfg!(feature = "cdc") {
108 f.push("cdc".into());
109 }
110 f
111 },
112 }
113 }
114
115 #[must_use]
121 pub fn memory_usage(&self) -> crate::memory_usage::MemoryUsage {
122 use crate::memory_usage::{BufferManagerMemory, CacheMemory, MemoryUsage};
123 use grafeo_common::memory::MemoryRegion;
124
125 let (store, indexes, mvcc, string_pool) = self.lpg_store().memory_breakdown();
126
127 let (parsed_bytes, optimized_bytes, cached_plan_count) =
128 self.query_cache.heap_memory_bytes();
129 let mut caches = CacheMemory {
130 parsed_plan_cache_bytes: parsed_bytes,
131 optimized_plan_cache_bytes: optimized_bytes,
132 cached_plan_count,
133 ..Default::default()
134 };
135 caches.compute_total();
136
137 let bm_stats = self.buffer_manager.stats();
138 let buffer_manager = BufferManagerMemory {
139 budget_bytes: bm_stats.budget,
140 allocated_bytes: bm_stats.total_allocated,
141 graph_storage_bytes: bm_stats.region_usage(MemoryRegion::GraphStorage),
142 index_buffers_bytes: bm_stats.region_usage(MemoryRegion::IndexBuffers),
143 execution_buffers_bytes: bm_stats.region_usage(MemoryRegion::ExecutionBuffers),
144 spill_staging_bytes: bm_stats.region_usage(MemoryRegion::SpillStaging),
145 };
146
147 let mut usage = MemoryUsage {
148 store,
149 indexes,
150 mvcc,
151 caches,
152 string_pool,
153 buffer_manager,
154 ..Default::default()
155 };
156
157 #[cfg(feature = "triple-store")]
158 {
159 use crate::memory_usage::RdfMemory;
160 let (
161 triple_count,
162 triples_and_indexes_bytes,
163 term_dictionary_bytes,
164 ring_index_bytes,
165 named_graph_count,
166 ) = self.rdf_store.heap_memory_bytes();
167 usage.rdf = RdfMemory {
168 triple_count,
169 triples_and_indexes_bytes,
170 term_dictionary_bytes,
171 ring_index_bytes,
172 named_graph_count,
173 total_bytes: 0,
174 };
175 usage.rdf.compute_total();
176 }
177
178 #[cfg(feature = "cdc")]
179 {
180 use crate::memory_usage::CdcMemory;
181 let (total_bytes, entity_count, event_count) = self.cdc_log.heap_memory_bytes();
182 usage.cdc = CdcMemory {
183 total_bytes,
184 entity_count,
185 event_count,
186 };
187 }
188
189 usage.compute_total();
190 usage
191 }
192
193 #[must_use]
197 pub fn detailed_stats(&self) -> crate::admin::DatabaseStats {
198 #[cfg(feature = "wal")]
199 let disk_bytes = self.config.path.as_ref().and_then(|p| {
200 if p.exists() {
201 Self::calculate_disk_usage(p).ok()
202 } else {
203 None
204 }
205 });
206 #[cfg(not(feature = "wal"))]
207 let disk_bytes: Option<usize> = None;
208
209 crate::admin::DatabaseStats {
210 node_count: self.lpg_store().node_count(),
211 edge_count: self.lpg_store().edge_count(),
212 label_count: self.lpg_store().label_count(),
213 edge_type_count: self.lpg_store().edge_type_count(),
214 property_key_count: self.lpg_store().property_key_count(),
215 index_count: self.catalog.index_count(),
216 memory_bytes: self.memory_usage().total_bytes,
217 disk_bytes,
218 }
219 }
220
221 #[cfg(feature = "wal")]
223 fn calculate_disk_usage(path: &Path) -> Result<usize> {
224 let mut total = 0usize;
225 if path.is_dir() {
226 for entry in std::fs::read_dir(path)? {
227 let entry = entry?;
228 let metadata = entry.metadata()?;
229 if metadata.is_file() {
230 #[allow(clippy::cast_possible_truncation)]
232 let file_len = metadata.len() as usize;
233 total += file_len;
234 } else if metadata.is_dir() {
235 total += Self::calculate_disk_usage(&entry.path())?;
236 }
237 }
238 }
239 Ok(total)
240 }
241
242 #[must_use]
247 pub fn schema(&self) -> crate::admin::SchemaInfo {
248 let labels = self
249 .lpg_store()
250 .all_labels()
251 .into_iter()
252 .map(|name| crate::admin::LabelInfo {
253 name: name.clone(),
254 count: self.lpg_store().nodes_with_label(&name).count(),
255 })
256 .collect();
257
258 let edge_types = self
259 .lpg_store()
260 .all_edge_types()
261 .into_iter()
262 .map(|name| crate::admin::EdgeTypeInfo {
263 name: name.clone(),
264 count: self.lpg_store().edges_with_type(&name).count(),
265 })
266 .collect();
267
268 let property_keys = self.lpg_store().all_property_keys();
269
270 crate::admin::SchemaInfo::Lpg(crate::admin::LpgSchemaInfo {
271 labels,
272 edge_types,
273 property_keys,
274 })
275 }
276
277 #[must_use]
279 pub fn list_indexes(&self) -> Vec<crate::admin::IndexInfo> {
280 self.catalog
281 .all_indexes()
282 .into_iter()
283 .map(|def| {
284 let label_name = self
285 .catalog
286 .get_label_name(def.label)
287 .unwrap_or_else(|| "?".into());
288 let prop_name = self
289 .catalog
290 .get_property_key_name(def.property_key)
291 .unwrap_or_else(|| "?".into());
292 crate::admin::IndexInfo {
293 name: format!("idx_{}_{}", label_name, prop_name),
294 index_type: format!("{:?}", def.index_type),
295 target: format!("{}:{}", label_name, prop_name),
296 unique: false,
297 cardinality: None,
298 size_bytes: None,
299 }
300 })
301 .collect()
302 }
303
304 #[must_use]
312 pub fn validate(&self) -> crate::admin::ValidationResult {
313 let mut result = crate::admin::ValidationResult::default();
314
315 for edge in self.lpg_store().all_edges() {
317 if self.lpg_store().get_node(edge.src).is_none() {
318 result.errors.push(crate::admin::ValidationError {
319 code: "DANGLING_SRC".to_string(),
320 message: format!(
321 "Edge {} references non-existent source node {}",
322 edge.id.0, edge.src.0
323 ),
324 context: Some(format!("edge:{}", edge.id.0)),
325 });
326 }
327 if self.lpg_store().get_node(edge.dst).is_none() {
328 result.errors.push(crate::admin::ValidationError {
329 code: "DANGLING_DST".to_string(),
330 message: format!(
331 "Edge {} references non-existent destination node {}",
332 edge.id.0, edge.dst.0
333 ),
334 context: Some(format!("edge:{}", edge.id.0)),
335 });
336 }
337 }
338
339 if self.lpg_store().node_count() > 0 && self.lpg_store().edge_count() == 0 {
341 result.warnings.push(crate::admin::ValidationWarning {
342 code: "NO_EDGES".to_string(),
343 message: "Database has nodes but no edges".to_string(),
344 context: None,
345 });
346 }
347
348 result
349 }
350
351 #[must_use]
355 pub fn wal_status(&self) -> crate::admin::WalStatus {
356 #[cfg(feature = "wal")]
357 if let Some(ref wal) = self.wal {
358 return crate::admin::WalStatus {
359 enabled: true,
360 path: self.config.path.as_ref().map(|p| p.join("wal")),
361 size_bytes: wal.size_bytes(),
362 #[allow(clippy::cast_possible_truncation)]
364 record_count: wal.record_count() as usize,
365 last_checkpoint: wal.last_checkpoint_timestamp(),
366 current_epoch: self.lpg_store().current_epoch().as_u64(),
367 };
368 }
369
370 crate::admin::WalStatus {
371 enabled: false,
372 path: None,
373 size_bytes: 0,
374 record_count: 0,
375 last_checkpoint: None,
376 current_epoch: self.lpg_store().current_epoch().as_u64(),
377 }
378 }
379
380 pub fn wal_checkpoint(&self) -> Result<()> {
388 if self.read_only {
391 return Ok(());
392 }
393
394 #[cfg(feature = "wal")]
395 if let Some(ref wal) = self.wal {
396 let epoch = self.lpg_store().current_epoch();
397 let transaction_id = self
398 .transaction_manager
399 .last_assigned_transaction_id()
400 .unwrap_or_else(|| self.transaction_manager.begin());
401 wal.checkpoint(transaction_id, epoch)?;
402 wal.sync()?;
403 }
404
405 #[cfg(feature = "grafeo-file")]
407 if let Some(ref fm) = self.file_manager {
408 let _ = self.checkpoint_to_file(fm, super::flush::FlushReason::Explicit)?;
409 }
410
411 Ok(())
412 }
413
414 #[cfg(feature = "cdc")]
420 #[must_use]
421 pub fn is_cdc_enabled(&self) -> bool {
422 self.cdc_active()
423 }
424
425 #[cfg(feature = "cdc")]
429 pub fn set_cdc_enabled(&self, enabled: bool) {
430 self.cdc_enabled
431 .store(enabled, std::sync::atomic::Ordering::Relaxed);
432 }
433
434 #[cfg(feature = "cdc")]
442 pub fn history(
443 &self,
444 entity_id: impl Into<crate::cdc::EntityId>,
445 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
446 Ok(self.cdc_log.history(entity_id.into()))
447 }
448
449 #[cfg(feature = "cdc")]
455 pub fn history_since(
456 &self,
457 entity_id: impl Into<crate::cdc::EntityId>,
458 since_epoch: grafeo_common::types::EpochId,
459 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
460 Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
461 }
462
463 #[cfg(feature = "cdc")]
469 pub fn changes_between(
470 &self,
471 start_epoch: grafeo_common::types::EpochId,
472 end_epoch: grafeo_common::types::EpochId,
473 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
474 Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
475 }
476}