1use crate::access_controller::manifest::{CreateAccessControllerOptions, ManifestParams};
2use crate::access_controller::{
3 simple::SimpleAccessController, traits::AccessController,
4 traits::Option as AccessControllerOption,
5};
6use crate::error::{GuardianError, Result};
7use crate::traits::BaseGuardianDB;
8use cid::Cid;
9use std::str::FromStr;
10use std::sync::Arc;
11use tracing::{debug, error, info, instrument, warn};
12
13#[instrument(skip(db, params, _options), fields(controller_type = %controller_type))]
25pub async fn create(
26 db: Arc<dyn BaseGuardianDB<Error = GuardianError>>,
27 controller_type: &str,
28 params: CreateAccessControllerOptions,
29 _options: AccessControllerOption,
30) -> Result<Cid> {
31 info!(target: "access_controller_utils", controller_type = %controller_type, "Creating access controller");
32
33 let controller_type_normalized = controller_type.to_lowercase();
35 match controller_type_normalized.as_str() {
36 "simple" | "guardian" | "ipfs" => {}
37 _ => {
38 warn!(target: "access_controller_utils", controller_type = %controller_type, "Unknown access controller type");
39 return Err(GuardianError::Store(format!(
40 "Unknown access controller type: {}",
41 controller_type
42 )));
43 }
44 }
45
46 let controller = create_controller(
48 &controller_type_normalized,
49 params.clone(),
50 Some(db.ipfs().as_ref()),
51 Some(db.clone()),
52 )
53 .await?;
54
55 let _manifest_params = controller.save().await?;
57
58 let access_address = ensure_address(&controller_type_normalized);
60
61 debug!(target: "access_controller_utils",
62 controller_type = %controller_type,
63 address = %access_address,
64 "Access controller created successfully"
65 );
66
67 let ipfs_client = db.ipfs();
69
70 let manifest_cid = crate::access_controller::manifest::create(
72 ipfs_client,
73 controller_type_normalized,
74 ¶ms,
75 )
76 .await
77 .map_err(|e| {
78 error!(target: "access_controller_utils", error = %e, "Failed to create manifest in IPFS");
79 GuardianError::Store(format!(
80 "Failed to create access controller manifest: {}",
81 e
82 ))
83 })?;
84
85 info!(target: "access_controller_utils",
86 cid = %manifest_cid,
87 controller_type = %controller_type,
88 address = %access_address,
89 "Access controller manifest created in IPFS"
90 );
91
92 Ok(manifest_cid)
93}
94
95#[instrument(skip(db, params, _options), fields(manifest_address = %manifest_address))]
107pub async fn resolve(
108 db: Arc<dyn BaseGuardianDB<Error = GuardianError>>,
109 manifest_address: &str,
110 params: &CreateAccessControllerOptions,
111 _options: AccessControllerOption,
112) -> Result<Arc<dyn AccessController>> {
113 info!(target: "access_controller_utils", manifest_address = %manifest_address, "Resolving access controller");
114
115 let access_address = ensure_address(manifest_address);
117
118 if access_address.is_empty() {
120 return Err(GuardianError::Store(
121 "Manifest address cannot be empty".to_string(),
122 ));
123 }
124
125 debug!(target: "access_controller_utils", address = %access_address, "Loading access controller manifest");
126
127 let ipfs_client = db.ipfs();
129
130 let manifest_result =
132 crate::access_controller::manifest::resolve(ipfs_client, &access_address, params).await;
133
134 let controller_type = match manifest_result {
135 Ok(manifest) => {
136 debug!(target: "access_controller_utils",
137 controller_type = %manifest.get_type,
138 address = %access_address,
139 "Loaded controller type from IPFS manifest"
140 );
141 manifest.get_type
142 }
143 Err(e) => {
144 warn!(target: "access_controller_utils",
145 error = %e,
146 address = %access_address,
147 "Failed to load manifest from IPFS, falling back to inference"
148 );
149 infer_controller_type(&access_address, params)
151 }
152 };
153
154 debug!(target: "access_controller_utils",
155 controller_type = %controller_type,
156 address = %access_address,
157 "Controller type determined"
158 );
159
160 let controller = create_controller(
162 &controller_type,
163 params.clone(),
164 Some(db.ipfs().as_ref()),
165 Some(db.clone()),
166 )
167 .await?;
168
169 if let Err(e) = controller.load(&access_address).await {
171 warn!(target: "access_controller_utils",
172 error = %e,
173 address = %access_address,
174 "Failed to load controller state, using defaults"
175 );
176 }
177
178 info!(target: "access_controller_utils",
179 controller_type = %controller_type,
180 address = %access_address,
181 "Access controller resolved successfully"
182 );
183
184 Ok(controller)
185}
186
187pub fn ensure_address(address: &str) -> String {
196 let address = address.trim();
198 if address.is_empty() {
200 return "_access".to_string();
201 }
202 if address.split('/').next_back() == Some("_access") {
207 return address.to_string();
208 }
209 if address.ends_with('/') {
211 format!("{}{}", address, "_access")
212 } else {
213 format!("{}/{}", address, "_access")
214 }
215}
216
217#[instrument(skip(params, ipfs_client, guardian_db))]
229async fn create_controller(
230 controller_type: &str,
231 params: CreateAccessControllerOptions,
232 ipfs_client: Option<&crate::ipfs_core_api::client::IpfsClient>,
233 guardian_db: Option<Arc<dyn BaseGuardianDB<Error = GuardianError>>>,
234) -> Result<Arc<dyn AccessController>> {
235 debug!(target: "access_controller_utils", controller_type = %controller_type, "Creating access controller instance");
236
237 match controller_type {
238 "simple" => {
239 let initial_keys = if params.get_all_access().is_empty() {
240 let mut default_permissions = std::collections::HashMap::new();
242 default_permissions.insert("write".to_string(), vec!["*".to_string()]);
243 Some(default_permissions)
244 } else {
245 Some(params.get_all_access())
246 };
247 let controller = SimpleAccessController::new(initial_keys);
248 Ok(Arc::new(controller) as Arc<dyn AccessController>)
249 }
250 "ipfs" => {
251 debug!(target: "access_controller_utils", "Creating IpfsAccessController");
252
253 let ipfs_client = ipfs_client.ok_or_else(|| {
255 GuardianError::Store("IPFS client is required for IpfsAccessController".to_string())
256 })?;
257
258 let identity_id = if let Some(write_keys) = params.get_access("write") {
260 if !write_keys.is_empty() {
261 write_keys[0].clone()
262 } else {
263 "*".to_string()
264 }
265 } else {
266 "*".to_string()
267 };
268
269 debug!(target: "access_controller_utils",
270 identity_id = %identity_id,
271 "Creating IpfsAccessController with identity"
272 );
273
274 let controller = crate::access_controller::ipfs::IpfsAccessController::new(
276 Arc::new(ipfs_client.clone()),
277 identity_id,
278 params,
279 ).map_err(|e| {
280 error!(target: "access_controller_utils", error = %e, "Failed to create IpfsAccessController");
281 GuardianError::Store(format!("Failed to create IpfsAccessController: {}", e))
282 })?;
283
284 info!(target: "access_controller_utils", "IpfsAccessController created successfully");
285 Ok(Arc::new(controller) as Arc<dyn AccessController>)
286 }
287 "guardian" => {
288 debug!(target: "access_controller_utils", "Creating GuardianDBAccessController");
289
290 let guardian_db_instance = guardian_db.ok_or_else(|| {
292 GuardianError::Store(
293 "GuardianDB instance is required for GuardianDBAccessController".to_string(),
294 )
295 })?;
296
297 let kv_provider = GuardianDBAdapter::new(guardian_db_instance);
299
300 debug!(target: "access_controller_utils", "Creating GuardianDBAccessController with adapter");
301
302 let controller = crate::access_controller::guardian::GuardianDBAccessController::new(
304 Arc::new(kv_provider),
305 Box::new(params),
306 ).await.map_err(|e| {
307 error!(target: "access_controller_utils", error = %e, "Failed to create GuardianDBAccessController");
308 GuardianError::Store(format!("Failed to create GuardianDBAccessController: {}", e))
309 })?;
310
311 info!(target: "access_controller_utils", "GuardianDBAccessController created successfully");
312 Ok(Arc::new(controller) as Arc<dyn AccessController>)
313 }
314 _ => {
315 error!(target: "access_controller_utils", controller_type = %controller_type, "Unsupported access controller type");
316 Err(GuardianError::Store(format!(
317 "Unsupported access controller type: {}",
318 controller_type
319 )))
320 }
321 }
322}
323
324fn infer_controller_type(address: &str, params: &CreateAccessControllerOptions) -> String {
333 let explicit_type = params.get_type();
335 if !explicit_type.is_empty() {
336 return explicit_type.to_string();
337 }
338 if address.contains("/guardian/") || address.contains("guardian_") {
340 return "guardian".to_string();
341 }
342 if address.contains("/ipfs/") || address.contains("ipfs_") {
343 return "ipfs".to_string();
344 }
345 "simple".to_string()
347}
348
349pub fn validate_address(address: &str) -> Result<()> {
358 if address.trim().is_empty() {
359 return Err(GuardianError::Store("Address cannot be empty".to_string()));
360 }
361 if address.contains("..") || address.contains("//") {
363 return Err(GuardianError::Store(
364 "Address contains invalid path components".to_string(),
365 ));
366 }
367 if address.len() > 1000 {
369 return Err(GuardianError::Store(
370 "Address is too long (max 1000 characters)".to_string(),
371 ));
372 }
373
374 Ok(())
375}
376
377pub fn list_available_types() -> Vec<String> {
382 vec![
383 "simple".to_string(),
384 "guardian".to_string(),
385 "ipfs".to_string(),
386 ]
387}
388
389pub fn is_supported_type(controller_type: &str) -> bool {
397 list_available_types().contains(&controller_type.to_lowercase())
398}
399
400pub struct GuardianDBAdapter {
402 base_db: Arc<dyn BaseGuardianDB<Error = GuardianError>>,
403}
404
405impl GuardianDBAdapter {
406 pub fn new(base_db: Arc<dyn BaseGuardianDB<Error = GuardianError>>) -> Self {
407 Self { base_db }
408 }
409}
410
411#[async_trait::async_trait]
412impl crate::traits::GuardianDBKVStoreProvider for GuardianDBAdapter {
413 type Error = GuardianError;
414
415 async fn key_value(
416 &self,
417 address: &str,
418 options: &mut crate::traits::CreateDBOptions,
419 ) -> std::result::Result<
420 Box<dyn crate::traits::KeyValueStore<Error = GuardianError>>,
421 Self::Error,
422 > {
423 let store = self.base_db.create(address, "keyvalue", options).await?;
425
426 Ok(Box::new(KeyValueStoreAdapter::new(store)))
428 }
429}
430
431pub struct KeyValueStoreAdapter {
433 store: Arc<dyn crate::traits::Store<Error = GuardianError>>,
434}
435
436impl KeyValueStoreAdapter {
437 pub fn new(store: Arc<dyn crate::traits::Store<Error = GuardianError>>) -> Self {
438 Self { store }
439 }
440}
441
442#[async_trait::async_trait]
443impl crate::traits::Store for KeyValueStoreAdapter {
444 type Error = GuardianError;
445
446 fn address(&self) -> &dyn crate::address::Address {
447 self.store.address()
448 }
449
450 fn store_type(&self) -> &str {
451 self.store.store_type()
452 }
453
454 async fn close(&self) -> std::result::Result<(), Self::Error> {
455 let event_bus = self.store.event_bus();
458
459 let close_event = serde_json::json!({
461 "event": "store_closed",
462 "address": self.store.address().to_string(),
463 "timestamp": chrono::Utc::now().to_rfc3339()
464 });
465
466 if let Ok(emitter) = event_bus.emitter::<serde_json::Value>().await {
468 let _ = emitter.emit(close_event);
469 }
470
471 tracing::info!("Store adapter closed: {}", self.store.address());
473 Ok(())
474 }
475
476 async fn drop(&mut self) -> std::result::Result<(), Self::Error> {
477 self.close().await?;
480
481 let op_log = self.store.op_log();
483
484 if let Some(log_guard) = op_log.try_write() {
486 drop(log_guard);
489 }
490
491 tracing::debug!("Store adapter dropped: {}", self.store.address());
492 Ok(())
493 }
494
495 fn events(&self) -> &dyn crate::events::EmitterInterface {
496 unimplemented!("events() is deprecated, use event_bus() instead")
498 }
499
500 fn index(&self) -> Box<dyn crate::traits::StoreIndex<Error = Self::Error> + Send + Sync> {
501 self.store.index()
502 }
503
504 fn replication_status(&self) -> crate::stores::replicator::replication_info::ReplicationInfo {
505 self.store.replication_status()
506 }
507
508 fn replicator(&self) -> Option<Arc<crate::stores::replicator::replicator::Replicator>> {
509 self.store.replicator()
510 }
511
512 fn cache(&self) -> Arc<dyn crate::data_store::Datastore> {
513 self.store.cache()
514 }
515
516 async fn load(&mut self, amount: usize) -> std::result::Result<(), Self::Error> {
517 let ipfs_client = self.store.ipfs();
519 let op_log = self.store.op_log();
520
521 let mut loaded_count = 0;
523
524 let heads = {
526 let log_guard = op_log.read();
527 log_guard.heads().clone()
528 };
529
530 for head_entry in heads {
532 if loaded_count >= amount {
533 break;
534 }
535
536 if let Ok(head_cid) = cid::Cid::from_str(head_entry.hash())
538 && let Ok(data) = ipfs_client.dag_get(&head_cid, None).await
539 {
540 if let Ok(entry_str) = std::str::from_utf8(&data)
542 && let Ok(entry) =
543 serde_json::from_str::<crate::ipfs_log::entry::Entry>(entry_str)
544 {
545 let entry_hash = entry.hash();
547 {
548 let mut log_guard = op_log.write();
549 if !log_guard.has(entry_hash) {
550 log_guard.append(entry_str, None);
551 loaded_count += 1;
552 }
553 }
554 }
555 }
556 }
557
558 Ok(())
559 }
560
561 async fn sync(
562 &mut self,
563 heads: Vec<crate::ipfs_log::entry::Entry>,
564 ) -> std::result::Result<(), Self::Error> {
565 let op_log = self.store.op_log();
567 let ipfs_client = self.store.ipfs();
568
569 for head_entry in heads {
571 {
573 let log_guard = op_log.read();
574 if log_guard.has(head_entry.hash()) {
575 continue; }
577 }
578
579 let mut entries_to_add = Vec::new();
581 let mut queue = vec![head_entry.clone()];
582
583 while let Some(entry) = queue.pop() {
584 entries_to_add.push(entry.clone());
585
586 for next_hash in &entry.next {
588 if let Ok(next_cid) = next_hash.parse::<cid::Cid>()
590 && let Ok(data) = ipfs_client.dag_get(&next_cid, None).await
591 && let Ok(entry_str) = std::str::from_utf8(&data)
592 && let Ok(parent_entry) =
593 serde_json::from_str::<crate::ipfs_log::entry::Entry>(entry_str)
594 {
595 let log_guard = op_log.read();
596 if !log_guard.has(parent_entry.hash()) {
597 drop(log_guard);
598 queue.push(parent_entry);
599 }
600 }
601 }
602 }
603
604 {
606 let mut log_guard = op_log.write();
607 for entry in entries_to_add.iter().rev() {
608 let entry_json = serde_json::to_string(entry).unwrap_or_default();
609 if !log_guard.has(entry.hash()) {
610 log_guard.append(&entry_json, None);
611 }
612 }
613 }
614 }
615
616 Ok(())
617 }
618
619 async fn load_more_from(&mut self, amount: u64, entries: Vec<crate::ipfs_log::entry::Entry>) {
620 let op_log = self.store.op_log();
622 let ipfs_client = self.store.ipfs();
623 let mut loaded_count = 0u64;
624
625 for entry in entries {
626 if loaded_count >= amount {
627 break;
628 }
629
630 for next_hash in &entry.next {
632 if loaded_count >= amount {
633 break;
634 }
635
636 if let Ok(next_cid) = next_hash.parse::<cid::Cid>()
638 && let Ok(data) = ipfs_client.dag_get(&next_cid, None).await
639 && let Ok(entry_str) = std::str::from_utf8(&data)
640 && let Ok(parent_entry) =
641 serde_json::from_str::<crate::ipfs_log::entry::Entry>(entry_str)
642 {
643 let should_add = {
645 let log_guard = op_log.read();
646 !log_guard.has(parent_entry.hash())
647 };
648
649 if should_add {
650 if let Some(mut log_guard) = op_log.try_write() {
652 log_guard.append(entry_str, None);
653 loaded_count += 1;
654 }
655 }
656 }
657 }
658 }
659 }
660
661 async fn load_from_snapshot(&mut self) -> std::result::Result<(), Self::Error> {
662 let ipfs_client = self.store.ipfs();
664 let op_log = self.store.op_log();
665 let store_address = self.store.address();
666
667 let snapshot_path = format!("{}/snapshot", store_address);
669
670 if let Ok(mut reader) = ipfs_client.cat(&snapshot_path).await {
672 use tokio::io::AsyncReadExt;
673 let mut snapshot_data = Vec::new();
674 if reader.read_to_end(&mut snapshot_data).await.is_ok()
675 && let Ok(snapshot_str) = std::str::from_utf8(&snapshot_data)
676 && let Ok(snapshot) =
677 serde_json::from_str::<Vec<crate::ipfs_log::entry::Entry>>(snapshot_str)
678 {
679 let mut log_guard = op_log.write();
681 for entry in &snapshot {
682 if !log_guard.has(entry.hash()) {
683 let entry_json = serde_json::to_string(entry).unwrap_or_default();
684 log_guard.append(&entry_json, None);
685 }
686 }
687
688 drop(log_guard);
689
690 tracing::info!(
692 "Successfully loaded {} entries from snapshot",
693 snapshot.len()
694 );
695
696 return Ok(());
697 }
698 }
699
700 Ok(())
702 }
703
704 fn op_log(&self) -> Arc<parking_lot::RwLock<crate::ipfs_log::log::Log>> {
705 self.store.op_log()
706 }
707
708 fn ipfs(&self) -> Arc<crate::ipfs_core_api::client::IpfsClient> {
709 self.store.ipfs()
710 }
711
712 fn db_name(&self) -> &str {
713 self.store.db_name()
714 }
715
716 fn identity(&self) -> &crate::ipfs_log::identity::Identity {
717 self.store.identity()
718 }
719
720 fn access_controller(&self) -> &dyn crate::access_controller::traits::AccessController {
721 self.store.access_controller()
722 }
723
724 async fn add_operation(
725 &mut self,
726 op: crate::stores::operation::operation::Operation,
727 on_progress_callback: Option<crate::traits::ProgressCallback>,
728 ) -> std::result::Result<crate::ipfs_log::entry::Entry, Self::Error> {
729 let op_log = self.store.op_log();
732 let identity = self.store.identity();
733 let ipfs_client = self.store.ipfs();
734
735 let payload = serde_json::to_vec(&op)
737 .map_err(|e| GuardianError::Store(format!("Failed to serialize operation: {}", e)))?;
738
739 let heads = {
741 let log_guard = op_log.read();
742 log_guard.heads()
743 };
744
745 let payload_str = std::str::from_utf8(&payload)
747 .map_err(|e| GuardianError::Store(format!("Invalid UTF-8 in payload: {}", e)))?;
748
749 let store_id = self.store.db_name();
750 let next_hashes: Vec<crate::ipfs_log::entry::EntryOrHash> = heads
751 .iter()
752 .map(|entry| crate::ipfs_log::entry::EntryOrHash::Entry(entry.as_ref()))
753 .collect();
754
755 let entry = crate::ipfs_log::entry::Entry::new(
756 identity.clone(),
757 store_id,
758 payload_str,
759 &next_hashes,
760 None, );
762
763 let entry_data = serde_json::to_vec(&entry)
765 .map_err(|e| GuardianError::Store(format!("Failed to serialize entry: {}", e)))?;
766
767 let _entry_cid = ipfs_client
768 .dag_put(&entry_data)
769 .await
770 .map_err(|e| GuardianError::Store(format!("Failed to store entry in IPFS: {}", e)))?;
771
772 {
774 let mut log_guard = op_log.write();
775 let entry_json = serde_json::to_string(&entry).map_err(|e| {
776 GuardianError::Store(format!("Failed to serialize entry for log: {}", e))
777 })?;
778 log_guard.append(&entry_json, None);
779 }
780
781 if let Some(callback) = on_progress_callback {
783 if (callback.send(entry.clone()).await).is_err() {
785 tracing::warn!("Failed to send progress callback");
787 }
788 }
789
790 Ok(entry)
791 }
792
793 fn span(&self) -> Arc<tracing::Span> {
794 self.store.span()
795 }
796
797 fn tracer(&self) -> Arc<crate::traits::TracerWrapper> {
798 self.store.tracer()
799 }
800
801 fn event_bus(&self) -> Arc<crate::p2p::events::EventBus> {
802 self.store.event_bus()
803 }
804
805 fn as_any(&self) -> &dyn std::any::Any {
806 self
807 }
808}
809
810#[async_trait::async_trait]
811impl crate::traits::KeyValueStore for KeyValueStoreAdapter {
812 async fn put(
813 &mut self,
814 key: &str,
815 value: Vec<u8>,
816 ) -> std::result::Result<crate::stores::operation::operation::Operation, Self::Error> {
817 let operation = crate::stores::operation::operation::Operation::new(
819 Some(key.to_string()),
820 "PUT".to_string(),
821 Some(value),
822 );
823
824 let op_log = self.store.op_log();
827 let entry_data = serde_json::to_string(&operation)
828 .map_err(|e| GuardianError::Store(format!("Failed to serialize operation: {}", e)))?;
829
830 {
831 let mut log_guard = op_log.write();
832 log_guard.append(&entry_data, None);
833 }
834
835 Ok(operation)
836 }
837
838 async fn get(&self, key: &str) -> std::result::Result<Option<Vec<u8>>, Self::Error> {
839 let op_log = self.store.op_log();
841 let log_guard = op_log.read();
842
843 for entry in log_guard.values().into_iter().rev() {
845 let operation_data = entry.payload();
846 if let Ok(operation) = serde_json::from_str::<
848 crate::stores::operation::operation::Operation,
849 >(operation_data)
850 && let Some(op_key) = &operation.key
851 {
852 if op_key == key && operation.op == "PUT" {
853 return Ok(Some(operation.value));
854 } else if op_key == key && operation.op == "DELETE" {
855 return Ok(None);
856 }
857 }
858 }
859
860 Ok(None)
861 }
862
863 async fn delete(
864 &mut self,
865 key: &str,
866 ) -> std::result::Result<crate::stores::operation::operation::Operation, Self::Error> {
867 let operation = crate::stores::operation::operation::Operation::new(
869 Some(key.to_string()),
870 "DELETE".to_string(),
871 None,
872 );
873
874 let op_log = self.store.op_log();
877 let entry_data = serde_json::to_string(&operation)
878 .map_err(|e| GuardianError::Store(format!("Failed to serialize operation: {}", e)))?;
879
880 {
881 let mut log_guard = op_log.write();
882 log_guard.append(&entry_data, None);
883 }
884
885 Ok(operation)
886 }
887
888 fn all(&self) -> std::collections::HashMap<String, Vec<u8>> {
889 let mut result = std::collections::HashMap::new();
891 let op_log = self.store.op_log();
892 let log_guard = op_log.read();
893
894 for entry in log_guard.values() {
896 let operation_data = entry.payload();
897 if let Ok(operation) = serde_json::from_str::<
899 crate::stores::operation::operation::Operation,
900 >(operation_data)
901 && let Some(key) = &operation.key
902 {
903 match operation.op.as_str() {
904 "PUT" => {
905 result.insert(key.clone(), operation.value);
906 }
907 "DELETE" => {
908 result.remove(key);
909 }
910 _ => {} }
912 }
913 }
914
915 result
916 }
917}