1use crate::{
2 context::{CCDBContext, Request},
3 data::{ColumnLayout, Data},
4 models::{
5 AssignmentMetaLite, ColumnMeta, ColumnType, ConstantSetMeta, DirectoryMeta, TypeTableMeta,
6 VariationMeta,
7 },
8 CCDBError, CCDBResult,
9};
10use chrono::{DateTime, Utc};
11use dashmap::DashMap;
12use gluex_core::{utils::resolve_path, Id, RunNumber};
13use parking_lot::{Mutex, MutexGuard};
14use rusqlite::{Connection, OpenFlags};
15use std::{
16 collections::{BTreeMap, HashMap, HashSet},
17 env,
18 path::Path,
19 sync::Arc,
20};
21
22fn normalize_path(base: &str, path: &str) -> String {
23 let mut segments: Vec<String> = Vec::new();
24 let mut push_parts = |value: &str| {
25 for part in value.split('/') {
26 if part.is_empty() || part == "." {
27 continue;
28 }
29 if part == ".." {
30 segments.pop();
31 } else {
32 segments.push(part.to_string());
33 }
34 }
35 };
36 if path.starts_with('/') {
37 push_parts(path);
38 } else {
39 push_parts(base);
40 push_parts(path);
41 }
42 if segments.is_empty() {
43 "/".to_string()
44 } else {
45 format!("/{}", segments.join("/"))
46 }
47}
48
49#[derive(Debug, Clone)]
51pub struct CCDB {
52 connection: Arc<Mutex<Connection>>,
53 connection_path: String,
54 variation_cache: Arc<DashMap<String, VariationMeta>>,
55 variation_chain_cache: Arc<DashMap<Id, Vec<VariationMeta>>>,
56 directory_meta: Arc<DashMap<Id, DirectoryMeta>>,
57 directory_by_path: Arc<DashMap<String, Id>>,
58 table_meta: Arc<DashMap<Id, TypeTableMeta>>,
59 table_by_dir_name: Arc<DashMap<(Id, String), Id>>,
60 column_layouts: Arc<DashMap<Id, Arc<ColumnLayout>>>,
61}
62
63impl CCDB {
64 pub fn new() -> CCDBResult<Self> {
70 let path = env::var("CCDB_CONNECTION")
71 .map_err(|_| CCDBError::MissingConnectionEnv("CCDB_CONNECTION".to_string()))?;
72 Self::open(path)
73 }
74
75 pub fn open(path: impl AsRef<Path>) -> CCDBResult<Self> {
81 let resolved_path = resolve_path(path)?;
82 let path_str = resolved_path.to_string_lossy().to_string();
83 let conn = Connection::open_with_flags(&resolved_path, OpenFlags::SQLITE_OPEN_READ_ONLY)?;
84 conn.pragma_update(None, "foreign_keys", "ON")?; let db = CCDB {
86 connection: Arc::new(Mutex::new(conn)),
87 variation_cache: Arc::new(DashMap::new()),
88 variation_chain_cache: Arc::new(DashMap::new()),
89 directory_meta: Arc::new(DashMap::new()),
90 directory_by_path: Arc::new(DashMap::new()),
91 table_meta: Arc::new(DashMap::new()),
92 table_by_dir_name: Arc::new(DashMap::new()),
93 column_layouts: Arc::new(DashMap::new()),
94 connection_path: path_str,
95 };
96 db.load_directories()?;
97 db.load_tables()?;
98 Ok(db)
99 }
100 pub fn connection(&self) -> MutexGuard<'_, Connection> {
102 self.connection.lock()
103 }
104 #[must_use]
106 pub fn connection_path(&self) -> &str {
107 &self.connection_path
108 }
109 fn load_directories(&self) -> CCDBResult<()> {
110 let connection = self.connection();
111 let mut stmt = connection.prepare(
112 "SELECT id, created, modified, name, parentId, authorId, comment,
113 isDeprecated, deprecatedByUserId, isLocked, lockedByUserId
114 FROM directories",
115 )?;
116 let rows = stmt.query_map([], |row| {
117 Ok(DirectoryMeta {
118 id: row.get(0)?,
119 created: row.get(1)?,
120 modified: row.get(2)?,
121 name: row.get(3)?,
122 parent_id: row.get(4)?,
123 author_id: row.get(5)?,
124 comment: row.get(6).unwrap_or_default(),
125 is_deprecated: row.get(7).unwrap_or_default(),
126 deprecated_by_user_id: row.get(8).unwrap_or_default(),
127 is_locked: row.get(9).unwrap_or_default(),
128 locked_by_user_id: row.get(10).unwrap_or_default(),
129 })
130 })?;
131 self.directory_meta.clear();
132 self.directory_by_path.clear();
133 for dir in rows {
134 let dir = dir?;
135 let id = dir.id;
136 let path = self.build_dir_path_from_meta(&dir);
137 self.directory_by_path.insert(path, id);
138 self.directory_meta.insert(id, dir);
139 }
140 Ok(())
141 }
142 fn build_dir_path_from_meta(&self, dir: &DirectoryMeta) -> String {
143 if dir.parent_id == 0 {
144 format!("/{}", dir.name)
145 } else if let Some(parent) = self.directory_meta.get(&dir.parent_id) {
146 let mut p = self.build_dir_path_from_meta(&parent);
147 if !p.ends_with('/') {
148 p.push('/');
149 }
150 p.push_str(&dir.name);
151 p
152 } else {
153 format!("/{}", dir.name)
154 }
155 }
156 fn load_tables(&self) -> CCDBResult<()> {
157 let connection = self.connection();
158 let mut stmt = connection.prepare(
159 "SELECT id, created, modified, directoryId, name,
160 nRows, nColumns, nAssignments, authorId, comment,
161 isDeprecated, deprecatedByUserId, isLocked, lockedByUserId, lockTime
162 FROM typeTables",
163 )?;
164 let rows = stmt.query_map([], |row| {
165 Ok(TypeTableMeta {
166 id: row.get(0)?,
167 created: row.get(1)?,
168 modified: row.get(2)?,
169 directory_id: row.get(3)?,
170 name: row.get(4)?,
171 n_rows: row.get(5)?,
172 n_columns: row.get(6)?,
173 n_assignments: row.get(7)?,
174 author_id: row.get(8)?,
175 comment: row.get(9).unwrap_or_default(),
176 is_deprecated: row.get(10).unwrap_or_default(),
177 deprecated_by_user_id: row.get(11).unwrap_or_default(),
178 is_locked: row.get(12).unwrap_or_default(),
179 locked_by_user_id: row.get(13).unwrap_or_default(),
180 lock_time: row.get(14).unwrap_or_default(),
181 })
182 })?;
183 self.table_meta.clear();
184 self.table_by_dir_name.clear();
185 for table in rows {
186 let table = table?;
187 let id = table.id;
188 let key = (table.directory_id, table.name.clone());
189 self.table_by_dir_name.insert(key, id);
190 self.table_meta.insert(id, table);
191 }
192 Ok(())
193 }
194
195 #[must_use]
197 pub fn root(&self) -> DirectoryHandle {
198 DirectoryHandle {
199 db: self.clone(),
200 meta: DirectoryMeta {
201 id: 0,
202 name: String::new(),
203 ..Default::default()
204 },
205 }
206 }
207
208 pub fn dir(&self, path: &str) -> CCDBResult<DirectoryHandle> {
214 if path == "/" || path.is_empty() {
215 return Ok(self.root());
216 }
217 let norm = normalize_path("/", path);
218 let id = self
219 .directory_by_path
220 .get(&norm)
221 .ok_or_else(|| CCDBError::DirectoryNotFoundError(norm.clone()))?;
222 let meta = self
223 .directory_meta
224 .get(&id)
225 .ok_or_else(|| CCDBError::DirectoryNotFoundError(norm.clone()))?;
226 Ok(DirectoryHandle {
227 db: self.clone(),
228 meta: meta.clone(),
229 })
230 }
231
232 pub fn table(&self, path: &str) -> CCDBResult<TypeTableHandle> {
238 let norm = normalize_path("/", path);
239 let (dir_path, table_name) = match norm.rsplit_once('/') {
240 Some((parent, name)) if !name.is_empty() => (parent, name),
241 _ => return Err(CCDBError::InvalidPathError(norm)),
242 };
243 let dir = self.dir(dir_path)?;
244 dir.table(table_name)
245 }
246 pub fn variation(&self, name: &str) -> CCDBResult<VariationMeta> {
252 if let Some(v) = self.variation_cache.get(name) {
253 return Ok(v.clone());
254 }
255 let connection = self.connection();
256 let mut stmt = connection.prepare_cached(
257 "SELECT id, created, modified, name, description, authorId, comment,
258 parentId, isLocked, lockTime, lockedByUserId,
259 goBackBehavior, goBackTime, isDeprecated, deprecatedByUserId
260 FROM variations
261 WHERE name = ?",
262 )?;
263 let mut rows = stmt.query([name])?;
264 if let Some(r) = rows.next()? {
265 let var = VariationMeta {
266 id: r.get(0)?,
267 created: r.get(1)?,
268 modified: r.get(2)?,
269 name: r.get(3)?,
270 description: r.get(4).unwrap_or_default(),
271 author_id: r.get(5)?,
272 comment: r.get(6).unwrap_or_default(),
273 parent_id: r.get(7)?,
274 is_locked: r.get(8).unwrap_or_default(),
275 lock_time: r.get(9).unwrap_or_default(),
276 locked_by_user_id: r.get(10).unwrap_or_default(),
277 go_back_behavior: r.get(11).unwrap_or_default(),
278 go_back_time: r.get(12).unwrap_or_default(),
279 is_deprecated: r.get(13).unwrap_or_default(),
280 deprecated_by_user_id: r.get(14).unwrap_or_default(),
281 };
282 self.variation_cache.insert(name.to_string(), var.clone());
283 Ok(var)
284 } else {
285 Err(CCDBError::VariationNotFoundError(name.to_string()))
286 }
287 }
288 pub fn variation_chain(&self, start: &VariationMeta) -> CCDBResult<Vec<VariationMeta>> {
294 if let Some(cached) = self.variation_chain_cache.get(&start.id) {
295 return Ok(cached.clone());
296 }
297 let mut chain = Vec::new();
298 let mut current = start.clone();
299
300 chain.push(current.clone());
301 let connection = self.connection();
302 let mut stmt = connection.prepare_cached(
303 "SELECT id, created, modified, name, description, authorId, comment,
304 parentId, isLocked, lockTime, lockedByUserId,
305 goBackBehavior, goBackTime, isDeprecated, deprecatedByUserId
306 FROM variations
307 WHERE id = ?",
308 )?;
309
310 while current.parent_id > 0 {
311 let mut rows = stmt.query([current.parent_id])?;
312 if let Some(r) = rows.next()? {
313 current = VariationMeta {
314 id: r.get(0)?,
315 created: r.get(1)?,
316 modified: r.get(2)?,
317 name: r.get(3)?,
318 description: r.get(4).unwrap_or_default(),
319 author_id: r.get(5)?,
320 comment: r.get(6).unwrap_or_default(),
321 parent_id: r.get(7)?,
322 is_locked: r.get(8).unwrap_or_default(),
323 lock_time: r.get(9).unwrap_or_default(),
324 locked_by_user_id: r.get(10).unwrap_or_default(),
325 go_back_behavior: r.get(11).unwrap_or(0),
326 go_back_time: r.get(12).unwrap_or_default(),
327 is_deprecated: r.get(13).unwrap_or_default(),
328 deprecated_by_user_id: r.get(14).unwrap_or_default(),
329 };
330 chain.push(current.clone());
331 } else {
332 break;
333 }
334 }
335
336 self.variation_chain_cache.insert(start.id, chain.clone());
337 Ok(chain)
338 }
339 pub fn request(&self, request_string: &str) -> CCDBResult<BTreeMap<RunNumber, Data>> {
346 let request: Request = request_string.parse()?;
347 let table = self.table(request.path.full_path())?;
348 table.fetch(&request.context)
349 }
350
351 pub fn fetch(&self, path: &str, ctx: &CCDBContext) -> CCDBResult<BTreeMap<RunNumber, Data>> {
357 let table = self.table(path)?;
358 table.fetch(ctx)
359 }
360}
361
362#[derive(Debug, Clone)]
364pub struct DirectoryHandle {
365 db: CCDB,
366 pub(crate) meta: DirectoryMeta,
367}
368
369impl DirectoryHandle {
370 #[must_use]
372 pub fn meta(&self) -> &DirectoryMeta {
373 &self.meta
374 }
375 #[must_use]
377 pub fn full_path(&self) -> String {
378 if self.meta.id == 0 {
379 "/".to_string()
380 } else {
381 let mut names = Vec::new();
382 let mut current = self.meta.clone();
383 loop {
384 if current.parent_id == 0 {
385 names.push(current.name.clone());
386 break;
387 }
388 names.push(current.name.clone());
389 if let Some(parent) = self.db.directory_meta.get(¤t.parent_id) {
390 current = parent.clone();
391 } else {
392 break;
393 }
394 }
395 names.reverse();
396 format!("/{}", names.join("/"))
397 }
398 }
399 #[must_use]
401 pub fn parent(&self) -> Option<Self> {
402 if self.meta.parent_id == 0 {
403 None
404 } else {
405 Some(DirectoryHandle {
406 db: self.db.clone(),
407 meta: self.db.directory_meta.get(&self.meta.parent_id)?.clone(),
408 })
409 }
410 }
411 #[must_use]
413 pub fn dirs(&self) -> Vec<DirectoryHandle> {
414 self.db
415 .directory_meta
416 .iter()
417 .filter(|meta| meta.parent_id == self.meta.id)
418 .map(|meta| DirectoryHandle {
419 db: self.db.clone(),
420 meta: meta.value().clone(),
421 })
422 .collect()
423 }
424 pub fn dir(&self, path: &str) -> CCDBResult<DirectoryHandle> {
430 let target = normalize_path(&self.full_path(), path);
431 self.db.dir(&target)
432 }
433 #[must_use]
435 pub fn tables(&self) -> Vec<TypeTableHandle> {
436 self.db
437 .table_meta
438 .iter()
439 .filter(|meta| meta.directory_id == self.meta.id)
440 .map(|meta| TypeTableHandle {
441 db: self.db.clone(),
442 meta: meta.value().clone(),
443 })
444 .collect()
445 }
446 pub fn table(&self, name: &str) -> CCDBResult<TypeTableHandle> {
452 let id = self
453 .db
454 .table_by_dir_name
455 .get(&(self.meta.id, name.to_string()))
456 .ok_or_else(|| {
457 CCDBError::TableNotFoundError(format!("{}/{}", self.full_path(), name))
458 })?;
459 let meta = self.db.table_meta.get(&id).ok_or_else(|| {
460 CCDBError::TableNotFoundError(format!("{}/{}", self.full_path(), name))
461 })?;
462 Ok(TypeTableHandle {
463 db: self.db.clone(),
464 meta: meta.clone(),
465 })
466 }
467}
468
469#[derive(Debug, Clone)]
471pub struct TypeTableHandle {
472 db: CCDB,
473 pub(crate) meta: TypeTableMeta,
474}
475impl TypeTableHandle {
476 #[must_use]
478 pub fn meta(&self) -> &TypeTableMeta {
479 &self.meta
480 }
481 #[must_use]
483 pub fn name(&self) -> &str {
484 &self.meta.name
485 }
486 #[must_use]
488 pub fn id(&self) -> Id {
489 self.meta.id
490 }
491 #[must_use]
493 pub fn full_path(&self) -> String {
494 let dir_meta = self.db.directory_meta.get(&self.meta.directory_id);
495 if let Some(dir_meta) = dir_meta {
496 let dir = DirectoryHandle {
497 db: self.db.clone(),
498 meta: dir_meta.clone(),
499 };
500 let mut p = dir.full_path();
501 if !p.ends_with('/') {
502 p.push('/');
503 }
504 p.push_str(&self.meta.name);
505 p
506 } else {
507 format!("/{}", self.meta.name)
508 }
509 }
510 pub fn columns(&self) -> CCDBResult<Vec<ColumnMeta>> {
517 Ok(self.column_layout()?.columns().to_vec())
518 }
519 fn load_column_metadata(&self) -> CCDBResult<Vec<ColumnMeta>> {
520 let connection = self.db.connection();
521 let mut stmt = connection.prepare_cached(
522 "SELECT id, created, modified, name, typeId, columnType, `order`, comment
523 FROM columns
524 WHERE typeId = ?
525 ORDER BY `order`",
526 )?;
527 let columns = stmt
528 .query_map([self.meta.id], |row| {
529 Ok(ColumnMeta {
530 id: row.get(0)?,
531 created: row.get(1)?,
532 modified: row.get(2)?,
533 name: row.get(3).unwrap_or_default(),
534 type_id: row.get(4)?,
535 column_type: ColumnType::type_from_str(&row.get::<_, String>(5)?)
536 .unwrap_or_default(),
537 order: row.get(6)?,
538 comment: row.get(7).unwrap_or_default(),
539 })
540 })?
541 .collect::<Result<Vec<ColumnMeta>, _>>()?;
542 Ok(columns)
543 }
544
545 fn column_layout(&self) -> CCDBResult<Arc<ColumnLayout>> {
546 if let Some(existing) = self.db.column_layouts.get(&self.meta.id) {
547 return Ok(existing.clone());
548 }
549 let columns = self.load_column_metadata()?;
550 let layout = Arc::new(ColumnLayout::new(columns));
551 self.db.column_layouts.insert(self.meta.id, layout.clone());
552 Ok(layout)
553 }
554 pub fn fetch(&self, ctx: &CCDBContext) -> CCDBResult<BTreeMap<RunNumber, Data>> {
561 let runs: Vec<RunNumber> = if ctx.runs.is_empty() {
562 vec![0]
563 } else {
564 ctx.runs.clone() };
566 let assignments = self.resolve_assignments(&runs, &ctx.variation, ctx.timestamp)?;
567 if assignments.is_empty() {
568 return Ok(BTreeMap::new());
569 }
570 self.load_vaults(&assignments)
571 }
572 fn resolve_assignments(
573 &self,
574 runs: &[RunNumber],
575 variation: &str,
576 timestamp: DateTime<Utc>,
577 ) -> CCDBResult<BTreeMap<RunNumber, Arc<ConstantSetMeta>>> {
578 if runs.is_empty() {
579 return Ok(BTreeMap::new());
580 }
581 let min_run = *runs.iter().min().expect("this is a bug, please report it!");
582 let max_run = *runs.iter().max().expect("this is a bug, please report it!");
583 let start_var_meta = self.db.variation(variation)?;
584 let var_chain = self.db.variation_chain(&start_var_meta)?;
585 let mut final_assignments: BTreeMap<RunNumber, Arc<ConstantSetMeta>> = BTreeMap::new();
586 let mut unresolved: HashSet<RunNumber> = runs.iter().copied().collect();
587 for var_meta in var_chain {
588 if unresolved.is_empty() {
589 break;
590 }
591 let partial = self.resolve_assignments_for_variation(
592 &unresolved,
593 &var_meta,
594 timestamp,
595 min_run,
596 max_run,
597 )?;
598 for (run, meta) in partial {
599 final_assignments.insert(run, meta);
600 unresolved.remove(&run);
601 }
602 }
603 Ok(final_assignments)
604 }
605 fn resolve_assignments_for_variation(
606 &self,
607 runs: &HashSet<RunNumber>,
608 var_meta: &VariationMeta,
609 timestamp: DateTime<Utc>,
610 min_run: RunNumber,
611 max_run: RunNumber,
612 ) -> CCDBResult<BTreeMap<RunNumber, Arc<ConstantSetMeta>>> {
613 let connection = self.db.connection();
614 let mut stmt = connection.prepare_cached(
615 "SELECT
616 a.id, a.created, a.constantSetId,
617 cs.id, cs.created, cs.modified, cs.vault, cs.constantTypeId,
618 rr.runMin, rr.runMax
619 FROM assignments a
620 JOIN constantSets cs ON cs.id = a.constantSetId
621 JOIN runRanges rr ON rr.id = a.runRangeId
622 WHERE cs.constantTypeId = ?
623 AND a.created <= datetime(?, 'unixepoch', 'localtime')
624 AND a.variationId = ?
625 AND rr.runMax >= ?
626 AND rr.runMin <= ?",
627 )?;
628 let valid_assignments = stmt
629 .query_map(
630 (
631 self.meta.id,
632 timestamp.timestamp(),
633 var_meta.id,
634 min_run,
635 max_run,
636 ),
637 |row| {
638 let meta = AssignmentMetaLite {
639 id: row.get(0)?,
640 created: row.get(1)?,
641 constant_set_id: row.get(2)?,
642 };
643 let constant_set = ConstantSetMeta {
644 id: row.get(3)?,
645 created: row.get(4)?,
646 modified: row.get(5)?,
647 vault: row.get(6)?,
648 constant_type_id: row.get(7)?,
649 };
650 let run_min: RunNumber = row.get(8)?;
651 let run_max: RunNumber = row.get(9)?;
652 Ok((meta, constant_set, run_min, run_max))
653 },
654 )?
655 .collect::<Result<Vec<(AssignmentMetaLite, ConstantSetMeta, RunNumber, RunNumber)>, _>>(
656 )?;
657 let mut best: BTreeMap<RunNumber, Arc<ConstantSetMeta>> = BTreeMap::new();
658 let mut best_created: HashMap<RunNumber, DateTime<Utc>> = HashMap::new(); let mut constant_set_cache: HashMap<Id, Arc<ConstantSetMeta>> = HashMap::new();
660 for &run in runs {
661 for (meta, constant_set, rmin, rmax) in &valid_assignments {
662 if run >= *rmin && run <= *rmax {
663 let cur_best = best_created.get(&run);
664 let created = meta.created()?;
665 if cur_best.is_none_or(|t| created > *t) {
666 let cs_entry = constant_set_cache
667 .entry(constant_set.id)
668 .or_insert_with(|| Arc::new(constant_set.clone()))
669 .clone();
670 best.insert(run, cs_entry);
671 best_created.insert(run, created);
672 }
673 }
674 }
675 }
676 Ok(best)
677 }
678 fn load_vaults(
679 &self,
680 assignments: &BTreeMap<RunNumber, Arc<ConstantSetMeta>>,
681 ) -> CCDBResult<BTreeMap<RunNumber, Data>> {
682 if assignments.is_empty() {
683 return Ok(BTreeMap::new());
684 }
685 let layout = self.column_layout()?;
686 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
687 let n_rows = self.meta.n_rows as usize;
688 assignments
689 .iter()
690 .map(|(run, constant_set)| {
691 Ok((
692 *run,
693 Data::from_vault(&constant_set.vault, layout.clone(), n_rows)?,
694 ))
695 })
696 .collect::<CCDBResult<BTreeMap<RunNumber, Data>>>()
697 }
698}