1use std::path::Path;
4
5use grafeo_common::utils::error::Result;
6
7impl super::GrafeoDB {
8 #[must_use]
14 pub fn node_count(&self) -> usize {
15 self.lpg_store().node_count()
16 }
17
18 #[must_use]
20 pub fn edge_count(&self) -> usize {
21 self.lpg_store().edge_count()
22 }
23
24 #[must_use]
26 pub fn label_count(&self) -> usize {
27 self.lpg_store().label_count()
28 }
29
30 #[must_use]
32 pub fn property_key_count(&self) -> usize {
33 self.lpg_store().property_key_count()
34 }
35
36 #[must_use]
38 pub fn edge_type_count(&self) -> usize {
39 self.lpg_store().edge_type_count()
40 }
41
42 #[must_use]
50 pub fn is_persistent(&self) -> bool {
51 self.config.path.is_some()
52 }
53
54 #[must_use]
58 pub fn path(&self) -> Option<&Path> {
59 self.config.path.as_deref()
60 }
61
62 #[must_use]
66 pub fn info(&self) -> crate::admin::DatabaseInfo {
67 crate::admin::DatabaseInfo {
68 mode: crate::admin::DatabaseMode::Lpg,
69 node_count: self.lpg_store().node_count(),
70 edge_count: self.lpg_store().edge_count(),
71 is_persistent: self.is_persistent(),
72 path: self.config.path.clone(),
73 wal_enabled: self.config.wal_enabled,
74 version: env!("CARGO_PKG_VERSION").to_string(),
75 features: {
76 let mut f = vec!["gql".into()];
77 if cfg!(feature = "cypher") {
78 f.push("cypher".into());
79 }
80 if cfg!(feature = "sparql") {
81 f.push("sparql".into());
82 }
83 if cfg!(feature = "gremlin") {
84 f.push("gremlin".into());
85 }
86 if cfg!(feature = "graphql") {
87 f.push("graphql".into());
88 }
89 if cfg!(feature = "sql-pgq") {
90 f.push("sql-pgq".into());
91 }
92 if cfg!(feature = "triple-store") {
93 f.push("rdf".into());
94 }
95 if cfg!(feature = "algos") {
96 f.push("algos".into());
97 }
98 if cfg!(feature = "vector-index") {
99 f.push("vector-index".into());
100 }
101 if cfg!(feature = "text-index") {
102 f.push("text-index".into());
103 }
104 if cfg!(feature = "hybrid-search") {
105 f.push("hybrid-search".into());
106 }
107 if cfg!(feature = "cdc") {
108 f.push("cdc".into());
109 }
110 f
111 },
112 }
113 }
114
115 #[must_use]
121 pub fn memory_usage(&self) -> crate::memory_usage::MemoryUsage {
122 use crate::memory_usage::{BufferManagerMemory, CacheMemory, MemoryUsage};
123 use grafeo_common::memory::MemoryRegion;
124
125 let (store, indexes, mvcc, string_pool) = self.lpg_store().memory_breakdown();
126
127 let (parsed_bytes, optimized_bytes, cached_plan_count) =
128 self.query_cache.heap_memory_bytes();
129 let mut caches = CacheMemory {
130 parsed_plan_cache_bytes: parsed_bytes,
131 optimized_plan_cache_bytes: optimized_bytes,
132 cached_plan_count,
133 ..Default::default()
134 };
135 caches.compute_total();
136
137 let bm_stats = self.buffer_manager.stats();
138 let buffer_manager = BufferManagerMemory {
139 budget_bytes: bm_stats.budget,
140 allocated_bytes: bm_stats.total_allocated,
141 graph_storage_bytes: bm_stats.region_usage(MemoryRegion::GraphStorage),
142 index_buffers_bytes: bm_stats.region_usage(MemoryRegion::IndexBuffers),
143 execution_buffers_bytes: bm_stats.region_usage(MemoryRegion::ExecutionBuffers),
144 spill_staging_bytes: bm_stats.region_usage(MemoryRegion::SpillStaging),
145 };
146
147 let mut usage = MemoryUsage {
148 store,
149 indexes,
150 mvcc,
151 caches,
152 string_pool,
153 buffer_manager,
154 ..Default::default()
155 };
156 usage.compute_total();
157 usage
158 }
159
160 #[must_use]
164 pub fn detailed_stats(&self) -> crate::admin::DatabaseStats {
165 #[cfg(feature = "wal")]
166 let disk_bytes = self.config.path.as_ref().and_then(|p| {
167 if p.exists() {
168 Self::calculate_disk_usage(p).ok()
169 } else {
170 None
171 }
172 });
173 #[cfg(not(feature = "wal"))]
174 let disk_bytes: Option<usize> = None;
175
176 crate::admin::DatabaseStats {
177 node_count: self.lpg_store().node_count(),
178 edge_count: self.lpg_store().edge_count(),
179 label_count: self.lpg_store().label_count(),
180 edge_type_count: self.lpg_store().edge_type_count(),
181 property_key_count: self.lpg_store().property_key_count(),
182 index_count: self.catalog.index_count(),
183 memory_bytes: self.memory_usage().total_bytes,
184 disk_bytes,
185 }
186 }
187
188 #[cfg(feature = "wal")]
190 fn calculate_disk_usage(path: &Path) -> Result<usize> {
191 let mut total = 0usize;
192 if path.is_dir() {
193 for entry in std::fs::read_dir(path)? {
194 let entry = entry?;
195 let metadata = entry.metadata()?;
196 if metadata.is_file() {
197 #[allow(clippy::cast_possible_truncation)]
199 let file_len = metadata.len() as usize;
200 total += file_len;
201 } else if metadata.is_dir() {
202 total += Self::calculate_disk_usage(&entry.path())?;
203 }
204 }
205 }
206 Ok(total)
207 }
208
209 #[must_use]
214 pub fn schema(&self) -> crate::admin::SchemaInfo {
215 let labels = self
216 .lpg_store()
217 .all_labels()
218 .into_iter()
219 .map(|name| crate::admin::LabelInfo {
220 name: name.clone(),
221 count: self.lpg_store().nodes_with_label(&name).count(),
222 })
223 .collect();
224
225 let edge_types = self
226 .lpg_store()
227 .all_edge_types()
228 .into_iter()
229 .map(|name| crate::admin::EdgeTypeInfo {
230 name: name.clone(),
231 count: self.lpg_store().edges_with_type(&name).count(),
232 })
233 .collect();
234
235 let property_keys = self.lpg_store().all_property_keys();
236
237 crate::admin::SchemaInfo::Lpg(crate::admin::LpgSchemaInfo {
238 labels,
239 edge_types,
240 property_keys,
241 })
242 }
243
244 #[must_use]
246 pub fn list_indexes(&self) -> Vec<crate::admin::IndexInfo> {
247 self.catalog
248 .all_indexes()
249 .into_iter()
250 .map(|def| {
251 let label_name = self
252 .catalog
253 .get_label_name(def.label)
254 .unwrap_or_else(|| "?".into());
255 let prop_name = self
256 .catalog
257 .get_property_key_name(def.property_key)
258 .unwrap_or_else(|| "?".into());
259 crate::admin::IndexInfo {
260 name: format!("idx_{}_{}", label_name, prop_name),
261 index_type: format!("{:?}", def.index_type),
262 target: format!("{}:{}", label_name, prop_name),
263 unique: false,
264 cardinality: None,
265 size_bytes: None,
266 }
267 })
268 .collect()
269 }
270
271 #[must_use]
279 pub fn validate(&self) -> crate::admin::ValidationResult {
280 let mut result = crate::admin::ValidationResult::default();
281
282 for edge in self.lpg_store().all_edges() {
284 if self.lpg_store().get_node(edge.src).is_none() {
285 result.errors.push(crate::admin::ValidationError {
286 code: "DANGLING_SRC".to_string(),
287 message: format!(
288 "Edge {} references non-existent source node {}",
289 edge.id.0, edge.src.0
290 ),
291 context: Some(format!("edge:{}", edge.id.0)),
292 });
293 }
294 if self.lpg_store().get_node(edge.dst).is_none() {
295 result.errors.push(crate::admin::ValidationError {
296 code: "DANGLING_DST".to_string(),
297 message: format!(
298 "Edge {} references non-existent destination node {}",
299 edge.id.0, edge.dst.0
300 ),
301 context: Some(format!("edge:{}", edge.id.0)),
302 });
303 }
304 }
305
306 if self.lpg_store().node_count() > 0 && self.lpg_store().edge_count() == 0 {
308 result.warnings.push(crate::admin::ValidationWarning {
309 code: "NO_EDGES".to_string(),
310 message: "Database has nodes but no edges".to_string(),
311 context: None,
312 });
313 }
314
315 result
316 }
317
318 #[must_use]
322 pub fn wal_status(&self) -> crate::admin::WalStatus {
323 #[cfg(feature = "wal")]
324 if let Some(ref wal) = self.wal {
325 return crate::admin::WalStatus {
326 enabled: true,
327 path: self.config.path.as_ref().map(|p| p.join("wal")),
328 size_bytes: wal.size_bytes(),
329 #[allow(clippy::cast_possible_truncation)]
331 record_count: wal.record_count() as usize,
332 last_checkpoint: wal.last_checkpoint_timestamp(),
333 current_epoch: self.lpg_store().current_epoch().as_u64(),
334 };
335 }
336
337 crate::admin::WalStatus {
338 enabled: false,
339 path: None,
340 size_bytes: 0,
341 record_count: 0,
342 last_checkpoint: None,
343 current_epoch: self.lpg_store().current_epoch().as_u64(),
344 }
345 }
346
347 pub fn wal_checkpoint(&self) -> Result<()> {
355 if self.read_only {
358 return Ok(());
359 }
360
361 #[cfg(feature = "wal")]
362 if let Some(ref wal) = self.wal {
363 let epoch = self.lpg_store().current_epoch();
364 let transaction_id = self
365 .transaction_manager
366 .last_assigned_transaction_id()
367 .unwrap_or_else(|| self.transaction_manager.begin());
368 wal.checkpoint(transaction_id, epoch)?;
369 wal.sync()?;
370 }
371
372 #[cfg(feature = "grafeo-file")]
374 if let Some(ref fm) = self.file_manager {
375 let _ = self.checkpoint_to_file(fm, super::flush::FlushReason::Explicit)?;
376 }
377
378 Ok(())
379 }
380
381 #[cfg(feature = "cdc")]
387 #[must_use]
388 pub fn is_cdc_enabled(&self) -> bool {
389 self.cdc_active()
390 }
391
392 #[cfg(feature = "cdc")]
396 pub fn set_cdc_enabled(&self, enabled: bool) {
397 self.cdc_enabled
398 .store(enabled, std::sync::atomic::Ordering::Relaxed);
399 }
400
401 #[cfg(feature = "cdc")]
409 pub fn history(
410 &self,
411 entity_id: impl Into<crate::cdc::EntityId>,
412 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
413 Ok(self.cdc_log.history(entity_id.into()))
414 }
415
416 #[cfg(feature = "cdc")]
422 pub fn history_since(
423 &self,
424 entity_id: impl Into<crate::cdc::EntityId>,
425 since_epoch: grafeo_common::types::EpochId,
426 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
427 Ok(self.cdc_log.history_since(entity_id.into(), since_epoch))
428 }
429
430 #[cfg(feature = "cdc")]
436 pub fn changes_between(
437 &self,
438 start_epoch: grafeo_common::types::EpochId,
439 end_epoch: grafeo_common::types::EpochId,
440 ) -> Result<Vec<crate::cdc::ChangeEvent>> {
441 Ok(self.cdc_log.changes_between(start_epoch, end_epoch))
442 }
443}