1use std::collections::HashMap;
10use std::path::{Path, PathBuf};
11use std::sync::Arc;
12
13use tokio::sync::{RwLock, broadcast};
14
15use crate::directory_config::error::{DirectoryConfigError, DirectoryConfigResult};
16use crate::directory_config::refresh::refresh_loop;
17use crate::directory_config::types::{
18 ChangeEvent, ChangeOperation, DirectoryConfigStoreConfig, WriteMode, WriteResult,
19};
20
21pub(crate) type TableCache = Arc<RwLock<HashMap<String, serde_yaml_ng::Value>>>;
23
24pub(crate) type TimestampCache = Arc<RwLock<HashMap<String, std::time::SystemTime>>>;
26
27#[derive(Debug)]
34pub struct DirectoryConfigStore {
35 config: DirectoryConfigStoreConfig,
36 cache: TableCache,
37 timestamps: TimestampCache,
38 write_mode: WriteMode,
39 change_tx: broadcast::Sender<ChangeEvent>,
40 shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
41}
42
43impl DirectoryConfigStore {
44 pub async fn new(config: DirectoryConfigStoreConfig) -> DirectoryConfigResult<Self> {
47 let dir = &config.directory;
48
49 if !dir.exists() || !dir.is_dir() {
50 return Err(DirectoryConfigError::DirectoryNotFound(
51 dir.display().to_string(),
52 ));
53 }
54
55 let write_mode = detect_write_mode(dir, config.git_enabled);
56 let cache: TableCache = Arc::new(RwLock::new(HashMap::new()));
57 let timestamps: TimestampCache = Arc::new(RwLock::new(HashMap::new()));
58
59 let (change_tx, _) = broadcast::channel(64);
61
62 let store = Self {
63 config,
64 cache: cache.clone(),
65 timestamps: timestamps.clone(),
66 write_mode,
67 change_tx,
68 shutdown_tx: None,
69 };
70
71 load_all_tables(&store.config.directory, &cache, ×tamps).await?;
73
74 tracing::info!(
75 directory = %store.config.directory.display(),
76 write_mode = ?store.write_mode,
77 tables = store.cache.read().await.len(),
78 "DirectoryConfigStore initialised"
79 );
80
81 Ok(store)
82 }
83
84 pub async fn start(&mut self) -> DirectoryConfigResult<()> {
86 if self.shutdown_tx.is_some() {
87 return Err(DirectoryConfigError::AlreadyRunning);
88 }
89
90 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
91 self.shutdown_tx = Some(shutdown_tx);
92
93 let cache = self.cache.clone();
94 let timestamps = self.timestamps.clone();
95 let directory = self.config.directory.clone();
96 let interval = self.config.refresh_interval;
97 let change_tx = self.change_tx.clone();
98
99 tokio::spawn(async move {
100 refresh_loop(
101 cache,
102 timestamps,
103 directory,
104 interval,
105 change_tx,
106 shutdown_rx,
107 )
108 .await;
109 });
110
111 tracing::info!(
112 interval_secs = self.config.refresh_interval.as_secs(),
113 "Background refresh started"
114 );
115
116 Ok(())
117 }
118
119 pub async fn stop(&mut self) -> DirectoryConfigResult<()> {
121 if let Some(tx) = self.shutdown_tx.take() {
122 let _ = tx.send(());
123 tracing::info!("Background refresh stopped");
124 Ok(())
125 } else {
126 Err(DirectoryConfigError::NotStarted)
127 }
128 }
129
130 pub async fn list_tables(&self) -> Vec<String> {
132 let cache = self.cache.read().await;
133 let mut tables: Vec<String> = cache.keys().cloned().collect();
134 tables.sort();
135 tables
136 }
137
138 pub async fn get(&self, table: &str) -> DirectoryConfigResult<serde_yaml_ng::Value> {
142 validate_table_name(table)?;
143 let table = normalize_table_name(table);
144 let cache = self.cache.read().await;
145 cache
146 .get(table)
147 .cloned()
148 .ok_or_else(|| DirectoryConfigError::TableNotFound(table.to_string()))
149 }
150
151 pub async fn get_key(
157 &self,
158 table: &str,
159 key: &str,
160 ) -> DirectoryConfigResult<serde_yaml_ng::Value> {
161 let value = self.get(table).await?;
162 let table = normalize_table_name(table);
163 navigate_yaml(&value, key).ok_or_else(|| DirectoryConfigError::KeyNotFound {
164 table: table.to_string(),
165 key: key.to_string(),
166 })
167 }
168
169 pub async fn get_as<T: serde::de::DeserializeOwned>(
173 &self,
174 table: &str,
175 ) -> DirectoryConfigResult<T> {
176 let value = self.get(table).await?;
177 let table = normalize_table_name(table);
178 serde_yaml_ng::from_value(value).map_err(|e| DirectoryConfigError::ParseError {
179 file: table.to_string(),
180 message: e.to_string(),
181 })
182 }
183
184 pub async fn set(
188 &self,
189 table: &str,
190 key: &str,
191 value: serde_yaml_ng::Value,
192 message: Option<&str>,
193 ) -> DirectoryConfigResult<WriteResult> {
194 self.check_writable()?;
195 validate_table_name(table)?;
196
197 let table = normalize_table_name(table);
198 let file_path = self.table_path(table);
199
200 if let Some(parent) = file_path.parent()
202 && !parent.exists()
203 {
204 tokio::fs::create_dir_all(parent).await?;
205 }
206
207 let mut doc = if file_path.exists() {
209 load_yaml_file(&file_path)?
210 } else {
211 serde_yaml_ng::Value::Mapping(serde_yaml_ng::Mapping::new())
212 };
213
214 set_yaml_key(&mut doc, key, value);
216
217 write_yaml_locked(&file_path, &doc)?;
219
220 {
222 let mut cache = self.cache.write().await;
223 cache.insert(table.to_string(), doc);
224 }
225
226 let (branch, commit) = self.maybe_git_commit(table, message).await?;
228
229 let result = WriteResult {
230 table: table.to_string(),
231 operation: ChangeOperation::Updated,
232 branch,
233 commit,
234 };
235
236 let _ = self.change_tx.send(ChangeEvent {
238 table: table.to_string(),
239 operation: ChangeOperation::Updated,
240 });
241
242 Ok(result)
243 }
244
245 pub async fn delete_key(
251 &self,
252 table: &str,
253 key: &str,
254 message: Option<&str>,
255 ) -> DirectoryConfigResult<WriteResult> {
256 self.check_writable()?;
257 validate_table_name(table)?;
258
259 let table = normalize_table_name(table);
260 let file_path = self.table_path(table);
261 if !file_path.exists() {
262 return Err(DirectoryConfigError::TableNotFound(table.to_string()));
263 }
264
265 let mut doc = load_yaml_file(&file_path)?;
266
267 if !remove_yaml_key(&mut doc, key) {
268 return Err(DirectoryConfigError::KeyNotFound {
269 table: table.to_string(),
270 key: key.to_string(),
271 });
272 }
273
274 write_yaml_locked(&file_path, &doc)?;
275
276 {
278 let mut cache = self.cache.write().await;
279 cache.insert(table.to_string(), doc);
280 }
281
282 let (branch, commit) = self.maybe_git_commit(table, message).await?;
283
284 let result = WriteResult {
285 table: table.to_string(),
286 operation: ChangeOperation::Deleted,
287 branch,
288 commit,
289 };
290
291 let _ = self.change_tx.send(ChangeEvent {
292 table: table.to_string(),
293 operation: ChangeOperation::Deleted,
294 });
295
296 Ok(result)
297 }
298
299 #[must_use]
304 pub fn on_change(&self) -> broadcast::Receiver<ChangeEvent> {
305 self.change_tx.subscribe()
306 }
307
308 #[must_use]
310 pub fn write_mode(&self) -> WriteMode {
311 self.write_mode
312 }
313
314 #[must_use]
316 pub fn is_git(&self) -> bool {
317 self.write_mode == WriteMode::GitCommit
318 }
319
320 #[cfg(feature = "directory-config-git")]
322 #[must_use]
323 pub fn current_branch(&self) -> Option<String> {
324 crate::directory_config::git::git_current_branch(&self.config.directory)
325 }
326
327 #[cfg(feature = "directory-config-git")]
329 pub fn list_branches(&self) -> DirectoryConfigResult<Vec<String>> {
330 if !self.is_git() {
331 return Err(DirectoryConfigError::NotGitRepo);
332 }
333 crate::directory_config::git::git_list_branches(&self.config.directory)
334 }
335
336 #[cfg(feature = "directory-config-git")]
338 pub fn switch_branch(&self, branch: &str, create: bool) -> DirectoryConfigResult<()> {
339 if !self.is_git() {
340 return Err(DirectoryConfigError::NotGitRepo);
341 }
342 crate::directory_config::git::git_switch_branch(&self.config.directory, branch, create)
343 }
344
345 fn check_writable(&self) -> DirectoryConfigResult<()> {
348 if self.write_mode == WriteMode::ReadOnly {
349 return Err(DirectoryConfigError::ReadOnly);
350 }
351 Ok(())
352 }
353
354 fn table_path(&self, table: &str) -> PathBuf {
360 let table = normalize_table_name(table);
361 let yaml_path = self.config.directory.join(format!("{table}.yaml"));
362 if yaml_path.exists() {
363 return yaml_path;
364 }
365 let yml_path = self.config.directory.join(format!("{table}.yml"));
366 if yml_path.exists() {
367 return yml_path;
368 }
369 yaml_path
371 }
372
373 #[allow(unused_variables)]
374 async fn maybe_git_commit(
375 &self,
376 table: &str,
377 message: Option<&str>,
378 ) -> DirectoryConfigResult<(Option<String>, Option<String>)> {
379 if self.write_mode != WriteMode::GitCommit {
380 return Ok((None, None));
381 }
382
383 #[cfg(feature = "directory-config-git")]
384 {
385 let table = normalize_table_name(table);
386 let default_msg = format!("config: update {table}");
387 let msg = message.unwrap_or(&default_msg);
388
389 let file_path = self.table_path(table);
391 let filename = file_path
392 .strip_prefix(&self.config.directory)
393 .unwrap_or(&file_path)
394 .to_string_lossy()
395 .to_string();
396
397 let commit_hash = crate::directory_config::git::git_add_and_commit(
398 &self.config.directory,
399 &filename,
400 msg,
401 &self.config.git_author_name,
402 &self.config.git_author_email,
403 )?;
404
405 if self.config.git_push {
406 crate::directory_config::git::git_push(&self.config.directory)?;
407 }
408
409 let branch = crate::directory_config::git::git_current_branch(&self.config.directory);
410
411 Ok((branch, Some(commit_hash)))
412 }
413
414 #[cfg(not(feature = "directory-config-git"))]
415 Ok((None, None))
416 }
417}
418
419pub(crate) fn validate_table_name(table: &str) -> DirectoryConfigResult<()> {
427 let trimmed = table.trim_matches('/');
428 if trimmed.is_empty() {
429 return Err(DirectoryConfigError::InvalidTableName(
430 "table name is empty".to_string(),
431 ));
432 }
433 if table.contains('\\') {
434 return Err(DirectoryConfigError::InvalidTableName(
435 "backslash not allowed".to_string(),
436 ));
437 }
438 for segment in trimmed.split('/') {
439 if segment.is_empty() {
440 return Err(DirectoryConfigError::InvalidTableName(
441 "empty path segment".to_string(),
442 ));
443 }
444 if segment == ".." {
445 return Err(DirectoryConfigError::InvalidTableName(
446 "path traversal not allowed".to_string(),
447 ));
448 }
449 if segment == "." {
450 return Err(DirectoryConfigError::InvalidTableName(
451 "current directory reference not allowed".to_string(),
452 ));
453 }
454 }
455 Ok(())
456}
457
458fn normalize_table_name(table: &str) -> &str {
460 table.trim_matches('/')
461}
462
463fn detect_write_mode(dir: &Path, git_enabled: bool) -> WriteMode {
465 let probe = dir.join(".dcs_write_probe");
467 match std::fs::File::create(&probe) {
468 Ok(_) => {
469 let _ = std::fs::remove_file(&probe);
470 }
471 Err(_) => return WriteMode::ReadOnly,
472 }
473
474 if git_enabled && dir.join(".git").exists() {
476 WriteMode::GitCommit
477 } else {
478 WriteMode::DirectWrite
479 }
480}
481
482pub(crate) async fn load_all_tables(
488 dir: &Path,
489 cache: &TableCache,
490 timestamps: &TimestampCache,
491) -> DirectoryConfigResult<()> {
492 let mut new_cache = HashMap::new();
493 let mut new_timestamps = HashMap::new();
494
495 load_tables_recursive(
496 dir,
497 dir,
498 cache,
499 timestamps,
500 &mut new_cache,
501 &mut new_timestamps,
502 )
503 .await?;
504
505 {
507 let mut cache_w = cache.write().await;
508 *cache_w = new_cache;
509 }
510 {
511 let mut ts_w = timestamps.write().await;
512 *ts_w = new_timestamps;
513 }
514
515 Ok(())
516}
517
518async fn load_tables_recursive(
520 root: &Path,
521 current: &Path,
522 cache: &TableCache,
523 timestamps: &TimestampCache,
524 new_cache: &mut HashMap<String, serde_yaml_ng::Value>,
525 new_timestamps: &mut HashMap<String, std::time::SystemTime>,
526) -> DirectoryConfigResult<()> {
527 let mut entries = tokio::fs::read_dir(current).await?;
528
529 while let Some(entry) = entries.next_entry().await? {
530 let path = entry.path();
531 let file_type = entry.file_type().await?;
532
533 if path
535 .file_name()
536 .and_then(|n| n.to_str())
537 .is_some_and(|n| n.starts_with('.'))
538 {
539 continue;
540 }
541
542 if file_type.is_dir() {
543 Box::pin(load_tables_recursive(
544 root,
545 &path,
546 cache,
547 timestamps,
548 new_cache,
549 new_timestamps,
550 ))
551 .await?;
552 continue;
553 }
554
555 let ext = path.extension().and_then(|e| e.to_str());
556 if !matches!(ext, Some("yaml" | "yml")) {
557 continue;
558 }
559
560 let rel = path.strip_prefix(root).unwrap_or(&path);
562 let table_name = rel.with_extension("").to_string_lossy().replace('\\', "/");
563
564 if table_name.is_empty() {
565 continue;
566 }
567
568 let modified = entry
569 .metadata()
570 .await
571 .ok()
572 .and_then(|m| m.modified().ok())
573 .unwrap_or(std::time::SystemTime::UNIX_EPOCH);
574
575 match tokio::fs::read_to_string(&path).await {
576 Ok(contents) => match serde_yaml_ng::from_str(&contents) {
577 Ok(value) => {
578 new_cache.insert(table_name.clone(), value);
579 new_timestamps.insert(table_name, modified);
580 }
581 Err(e) => {
582 tracing::warn!(
583 file = %path.display(),
584 error = %e,
585 "Corrupt YAML, keeping last known good"
586 );
587 let existing = cache.read().await;
588 if let Some(existing_value) = existing.get(&table_name) {
589 new_cache.insert(table_name.clone(), existing_value.clone());
590 }
591 if let Some(ts) = timestamps.read().await.get(&table_name) {
592 new_timestamps.insert(table_name, *ts);
593 }
594 }
595 },
596 Err(e) => {
597 tracing::warn!(
598 file = %path.display(),
599 error = %e,
600 "Failed to read file"
601 );
602 }
603 }
604 }
605
606 Ok(())
607}
608
609fn load_yaml_file(path: &Path) -> DirectoryConfigResult<serde_yaml_ng::Value> {
611 let contents = std::fs::read_to_string(path)?;
612 serde_yaml_ng::from_str(&contents).map_err(|e| DirectoryConfigError::ParseError {
613 file: path.display().to_string(),
614 message: e.to_string(),
615 })
616}
617
618fn write_yaml_locked(path: &Path, value: &serde_yaml_ng::Value) -> DirectoryConfigResult<()> {
628 use std::io::{Seek, SeekFrom, Write};
629
630 let yaml_str = serde_yaml_ng::to_string(value)
631 .map_err(|e| DirectoryConfigError::SerializationError(e.to_string()))?;
632
633 let mut file = std::fs::OpenOptions::new()
637 .write(true)
638 .create(true)
639 .truncate(false)
640 .open(path)?;
641
642 file.lock().map_err(DirectoryConfigError::IoError)?;
644
645 file.seek(SeekFrom::Start(0))?;
649 file.write_all(yaml_str.as_bytes())?;
650 file.set_len(yaml_str.len() as u64)?;
651 file.sync_data()?;
652
653 file.unlock().map_err(DirectoryConfigError::IoError)?;
654
655 Ok(())
656}
657
658fn navigate_yaml(value: &serde_yaml_ng::Value, key: &str) -> Option<serde_yaml_ng::Value> {
660 let parts: Vec<&str> = key.split('.').collect();
661 let mut current = value;
662
663 for part in &parts {
664 match current {
665 serde_yaml_ng::Value::Mapping(map) => {
666 let yaml_key = serde_yaml_ng::Value::String((*part).to_string());
667 current = map.get(&yaml_key)?;
668 }
669 _ => return None,
670 }
671 }
672
673 Some(current.clone())
674}
675
676fn set_yaml_key(doc: &mut serde_yaml_ng::Value, key: &str, value: serde_yaml_ng::Value) {
678 let parts: Vec<&str> = key.split('.').collect();
679 let mut current = doc;
680
681 for (i, part) in parts.iter().enumerate() {
682 let yaml_key = serde_yaml_ng::Value::String((*part).to_string());
683
684 if i == parts.len() - 1 {
685 if let serde_yaml_ng::Value::Mapping(map) = current {
687 map.insert(yaml_key, value);
688 return;
689 }
690 } else {
691 if !current.is_mapping() {
693 *current = serde_yaml_ng::Value::Mapping(serde_yaml_ng::Mapping::new());
694 }
695 let map = current.as_mapping_mut().unwrap();
696 if !map.contains_key(&yaml_key) {
697 map.insert(
698 yaml_key.clone(),
699 serde_yaml_ng::Value::Mapping(serde_yaml_ng::Mapping::new()),
700 );
701 }
702 current = map.get_mut(&yaml_key).unwrap();
703 }
704 }
705}
706
707fn remove_yaml_key(doc: &mut serde_yaml_ng::Value, key: &str) -> bool {
709 let parts: Vec<&str> = key.split('.').collect();
710
711 if parts.len() == 1 {
712 if let serde_yaml_ng::Value::Mapping(map) = doc {
713 let yaml_key = serde_yaml_ng::Value::String(parts[0].to_string());
714 return map.remove(&yaml_key).is_some();
715 }
716 return false;
717 }
718
719 let parent_parts = &parts[..parts.len() - 1];
721 let last_key = parts[parts.len() - 1];
722 let mut current = &mut *doc;
723
724 for part in parent_parts {
725 let yaml_key = serde_yaml_ng::Value::String((*part).to_string());
726 match current {
727 serde_yaml_ng::Value::Mapping(map) => {
728 if let Some(next) = map.get_mut(&yaml_key) {
729 current = next;
730 } else {
731 return false;
732 }
733 }
734 _ => return false,
735 }
736 }
737
738 if let serde_yaml_ng::Value::Mapping(map) = current {
739 let yaml_key = serde_yaml_ng::Value::String(last_key.to_string());
740 map.remove(&yaml_key).is_some()
741 } else {
742 false
743 }
744}
745
746#[cfg(test)]
747mod tests {
748 use super::*;
749
750 #[test]
751 fn test_navigate_yaml() {
752 let yaml: serde_yaml_ng::Value = serde_yaml_ng::from_str(
753 r"
754 database:
755 host: localhost
756 port: 5432
757 name: test
758 ",
759 )
760 .unwrap();
761
762 assert_eq!(
763 navigate_yaml(&yaml, "name"),
764 Some(serde_yaml_ng::Value::String("test".to_string()))
765 );
766 assert_eq!(
767 navigate_yaml(&yaml, "database.host"),
768 Some(serde_yaml_ng::Value::String("localhost".to_string()))
769 );
770 assert!(navigate_yaml(&yaml, "missing").is_none());
771 assert!(navigate_yaml(&yaml, "database.missing").is_none());
772 }
773
774 #[test]
775 fn test_set_yaml_key() {
776 let mut doc = serde_yaml_ng::Value::Mapping(serde_yaml_ng::Mapping::new());
777
778 set_yaml_key(
779 &mut doc,
780 "name",
781 serde_yaml_ng::Value::String("test".to_string()),
782 );
783 assert_eq!(
784 navigate_yaml(&doc, "name"),
785 Some(serde_yaml_ng::Value::String("test".to_string()))
786 );
787
788 set_yaml_key(
790 &mut doc,
791 "database.host",
792 serde_yaml_ng::Value::String("localhost".to_string()),
793 );
794 assert_eq!(
795 navigate_yaml(&doc, "database.host"),
796 Some(serde_yaml_ng::Value::String("localhost".to_string()))
797 );
798 }
799
800 #[test]
801 fn test_remove_yaml_key() {
802 let mut doc: serde_yaml_ng::Value = serde_yaml_ng::from_str(
803 r"
804 database:
805 host: localhost
806 port: 5432
807 name: test
808 ",
809 )
810 .unwrap();
811
812 assert!(remove_yaml_key(&mut doc, "name"));
813 assert!(navigate_yaml(&doc, "name").is_none());
814
815 assert!(remove_yaml_key(&mut doc, "database.host"));
816 assert!(navigate_yaml(&doc, "database.host").is_none());
817 assert!(navigate_yaml(&doc, "database.port").is_some());
819
820 assert!(!remove_yaml_key(&mut doc, "missing"));
822 }
823
824 #[test]
825 fn test_detect_write_mode_readonly() {
826 let mode = detect_write_mode(Path::new("/proc"), false);
829 assert_eq!(mode, WriteMode::ReadOnly);
830 }
831
832 #[test]
833 fn test_detect_write_mode_writable() {
834 let tmp = tempfile::tempdir().unwrap();
835 let mode = detect_write_mode(tmp.path(), false);
836 assert_eq!(mode, WriteMode::DirectWrite);
837 }
838
839 #[test]
840 fn test_validate_table_name_valid() {
841 assert!(validate_table_name("dfe-loader").is_ok());
842 assert!(validate_table_name("loaders/dfe-loader").is_ok());
843 assert!(validate_table_name("a/b/c").is_ok());
844 assert!(validate_table_name("my_table").is_ok());
845 }
846
847 #[test]
848 fn test_validate_table_name_rejects_traversal() {
849 assert!(validate_table_name("../etc/passwd").is_err());
850 assert!(validate_table_name("foo/../../bar").is_err());
851 assert!(validate_table_name("..").is_err());
852 }
853
854 #[test]
855 fn test_validate_table_name_rejects_backslash() {
856 assert!(validate_table_name("foo\\bar").is_err());
857 }
858
859 #[test]
860 fn test_validate_table_name_rejects_empty() {
861 assert!(validate_table_name("").is_err());
862 assert!(validate_table_name("/").is_err());
863 assert!(validate_table_name("//").is_err());
864 }
865
866 #[test]
867 fn test_validate_table_name_rejects_empty_segments() {
868 assert!(validate_table_name("foo//bar").is_err());
869 }
870
871 #[test]
872 fn test_validate_table_name_rejects_single_dot() {
873 assert!(validate_table_name(".").is_err());
874 assert!(validate_table_name("./foo").is_err());
875 assert!(validate_table_name("foo/./bar").is_err());
876 }
877
878 #[test]
879 fn test_normalize_table_name() {
880 assert_eq!(normalize_table_name("foo"), "foo");
881 assert_eq!(normalize_table_name("/foo/"), "foo");
882 assert_eq!(normalize_table_name("loaders/dfe"), "loaders/dfe");
883 assert_eq!(normalize_table_name("/loaders/dfe/"), "loaders/dfe");
884 }
885}