1use crate::auth::Sig;
18use crate::fwid::{Key, compute_key, fw_check, fw_to_key};
19use crate::types::{
20 Device, DeviceId, Endpoint, Identity, IdentityHandle, MlDsaKeyPair, Presence, PresenceReceipt,
21 StorageHandle, StorageStrategy,
22};
23use anyhow::{Context, Result};
24use serde::{Deserialize, Serialize};
25use std::collections::HashMap;
26use std::sync::Arc;
27use tokio::sync::RwLock;
28struct MockDht {
32 storage: HashMap<Key, Vec<u8>>,
33}
34
35impl MockDht {
36 fn new() -> Self {
37 Self {
38 storage: HashMap::new(),
39 }
40 }
41
42 async fn put(&mut self, key: Key, value: Vec<u8>) -> Result<()> {
43 self.storage.insert(key, value);
44 Ok(())
45 }
46
47 async fn get(&self, key: &Key) -> Result<Vec<u8>> {
48 self.storage
49 .get(key)
50 .cloned()
51 .ok_or_else(|| anyhow::anyhow!("Key not found"))
52 }
53}
54
55static DHT: once_cell::sync::Lazy<Arc<RwLock<MockDht>>> =
57 once_cell::sync::Lazy::new(|| Arc::new(RwLock::new(MockDht::new())));
58
59static GLOBAL_DHT_CLIENT: once_cell::sync::OnceCell<Arc<crate::dht::client::DhtClient>> =
61 once_cell::sync::OnceCell::new();
62
63pub fn set_dht_client(client: crate::dht::client::DhtClient) -> bool {
65 GLOBAL_DHT_CLIENT.set(Arc::new(client)).is_ok()
66}
67
68fn get_dht_client() -> Option<Arc<crate::dht::client::DhtClient>> {
69 GLOBAL_DHT_CLIENT.get().cloned()
70}
71
72async fn dht_put_bytes(key: &Key, value: Vec<u8>) -> Result<()> {
73 if let Some(client) = get_dht_client() {
74 let k = hex::encode(key.as_bytes());
75 let _ = client
76 .put(k, value)
77 .await
78 .context("Failed to store data in DHT client")?;
79 Ok(())
80 } else {
81 let mut dht = DHT.write().await;
82 dht.put(key.clone(), value).await
83 }
84}
85
86async fn dht_get_bytes(key: &Key) -> Result<Vec<u8>> {
87 if let Some(client) = get_dht_client() {
88 let k = hex::encode(key.as_bytes());
89 match client.get(k).await.context("DHT get failed")? {
90 Some(v) => Ok(v),
91 None => anyhow::bail!("Key not found"),
92 }
93 } else {
94 let dht = DHT.read().await;
95 dht.get(key).await
96 }
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct IdentityPacketV1 {
106 pub v: u8,
107 pub words: [String; 4],
108 pub id: Key,
109 pub pk: Vec<u8>,
110 pub sig: Option<Vec<u8>>, pub device_set_root: Key,
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct MemberRef {
117 pub member_id: Key,
118 pub member_pk: Vec<u8>,
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize)]
123pub struct GroupIdentityPacketV1 {
124 pub v: u8,
125 pub words: [String; 4],
126 pub id: Key,
127 pub group_pk: Vec<u8>,
128 pub group_sig: Vec<u8>,
129 pub members: Vec<MemberRef>,
130 pub membership_root: Key,
131 pub created_at: u64,
132 pub mls_ciphersuite: Option<u16>,
133}
134
135#[derive(Clone)]
137pub struct GroupKeyPair {
138 pub group_pk: crate::quantum_crypto::MlDsaPublicKey,
139 pub group_sk: crate::quantum_crypto::MlDsaSecretKey,
140}
141
142impl std::fmt::Debug for GroupKeyPair {
143 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144 write!(
145 f,
146 "GroupKeyPair {{ group_pk: <{} bytes>, group_sk: <hidden> }}",
147 self.group_pk.as_bytes().len()
148 )
149 }
150}
151
152pub async fn register_identity(words: [&str; 4], keypair: &MlDsaKeyPair) -> Result<IdentityHandle> {
165 let words_owned: [String; 4] = [
167 words[0].to_string(),
168 words[1].to_string(),
169 words[2].to_string(),
170 words[3].to_string(),
171 ];
172
173 if !fw_check(words_owned.clone()) {
175 anyhow::bail!("Invalid word in identity");
176 }
177
178 let key = fw_to_key(words_owned.clone())?;
180
181 let dht = DHT.read().await;
183 if dht.get(&key).await.is_ok() {
184 anyhow::bail!("Identity already registered");
185 }
186 drop(dht);
187
188 let identity = Identity {
190 words: words_owned.clone(),
191 key: key.clone(),
192 public_key: keypair.public_key.clone(),
193 };
194
195 let packet = IdentityPacketV1 {
196 v: 1,
197 words: words_owned.clone(),
198 id: key.clone(),
199 pk: keypair.public_key.clone(),
200 sig: None,
201 device_set_root: compute_key("device-set", key.as_bytes()),
202 };
203
204 dht_put_bytes(&key, serde_json::to_vec(&packet)?).await?;
205
206 Ok(IdentityHandle::new(identity, keypair.clone()))
207}
208
209pub async fn get_identity(key: Key) -> Result<Identity> {
217 let data = dht_get_bytes(&key).await.context("Identity not found")?;
219 if let Ok(pkt) = serde_json::from_slice::<IdentityPacketV1>(&data) {
220 let identity = Identity {
221 words: pkt.words,
222 key: pkt.id,
223 public_key: pkt.pk,
224 };
225 return Ok(identity);
226 }
227 let identity: Identity = serde_json::from_slice(&data)?;
229 Ok(identity)
230}
231
232pub async fn identity_fetch(key: Key) -> Result<IdentityPacketV1> {
234 let data = dht_get_bytes(&key).await.context("Identity not found")?;
235 let pkt: IdentityPacketV1 = serde_json::from_slice(&data)?;
236 Ok(pkt)
237}
238
239pub async fn register_presence(
253 handle: &IdentityHandle,
254 devices: Vec<Device>,
255 active_device: DeviceId,
256) -> Result<PresenceReceipt> {
257 if !devices.iter().any(|d| d.id == active_device) {
259 anyhow::bail!("Active device not in device list");
260 }
261
262 let presence = Presence {
264 identity: handle.key(),
265 devices,
266 active_device: Some(active_device),
267 timestamp: std::time::SystemTime::now()
268 .duration_since(std::time::UNIX_EPOCH)?
269 .as_secs(),
270 signature: vec![], };
272
273 let presence_bytes = serde_json::to_vec(&presence)?;
275 let signature = handle.sign(&presence_bytes)?;
276
277 let mut signed_presence = presence;
278 signed_presence.signature = signature;
279
280 let presence_key = derive_presence_key(handle.key());
282 let mut dht = DHT.write().await;
283 dht.put(presence_key, serde_json::to_vec(&signed_presence)?)
284 .await?;
285
286 let receipt = PresenceReceipt {
288 identity: handle.key(),
289 timestamp: signed_presence.timestamp,
290 storing_nodes: vec![Key::from([0u8; 32])], };
292
293 Ok(receipt)
294}
295
296pub async fn get_presence(identity_key: Key) -> Result<Presence> {
304 let presence_key = derive_presence_key(identity_key);
305 let dht = DHT.read().await;
306 let data = dht.get(&presence_key).await.context("Presence not found")?;
307 let presence: Presence = serde_json::from_slice(&data)?;
308 Ok(presence)
309}
310
311pub async fn register_headless(
321 handle: &IdentityHandle,
322 storage_gb: u32,
323 endpoint: Endpoint,
324) -> Result<DeviceId> {
325 let mut presence = get_presence(handle.key()).await?;
327
328 let device = Device {
330 id: DeviceId::generate(),
331 device_type: crate::types::presence::DeviceType::Headless,
332 storage_gb: storage_gb as u64,
333 endpoint,
334 capabilities: crate::types::presence::DeviceCapabilities {
335 storage_bytes: storage_gb as u64 * 1_000_000_000,
336 always_online: true,
337 supports_fec: true,
338 supports_seal: true,
339 ..Default::default()
340 },
341 };
342
343 let device_id = device.id;
344 presence.devices.push(device);
345
346 let active = presence.active_device.unwrap_or(device_id);
348 register_presence(handle, presence.devices, active).await?;
349
350 Ok(device_id)
351}
352
353pub async fn set_active_device(handle: &IdentityHandle, device_id: DeviceId) -> Result<()> {
359 let presence = get_presence(handle.key()).await?;
361
362 if !presence.devices.iter().any(|d| d.id == device_id) {
364 anyhow::bail!("Device not found in presence");
365 }
366
367 register_presence(handle, presence.devices, device_id).await?;
369 Ok(())
370}
371
372pub async fn store_data(
386 handle: &IdentityHandle,
387 data: Vec<u8>,
388 group_size: usize,
389) -> Result<StorageHandle> {
390 let strategy = StorageStrategy::from_group_size(group_size);
392
393 match strategy {
394 StorageStrategy::Direct => store_direct(handle, data).await,
395 StorageStrategy::FullReplication { replicas } => {
396 store_replicated(handle, data, replicas).await
397 }
398 StorageStrategy::FecEncoded {
399 data_shards,
400 parity_shards,
401 ..
402 } => store_with_fec(handle, data, data_shards, parity_shards).await,
403 }
404}
405
406pub async fn store_dyad(
416 handle1: &IdentityHandle,
417 _handle2_key: Key,
418 data: Vec<u8>,
419) -> Result<StorageHandle> {
420 store_replicated(handle1, data, 2).await
422}
423
424pub async fn store_with_fec(
435 handle: &IdentityHandle,
436 data: Vec<u8>,
437 data_shards: usize,
438 parity_shards: usize,
439) -> Result<StorageHandle> {
440 let storage_id = Key::from(*blake3::hash(&data).as_bytes());
442
443 let mut shard_map = crate::types::storage::ShardMap::new();
448
449 let presence = get_presence(handle.key()).await?;
451
452 let mut headless_devices: Vec<_> = presence
454 .devices
455 .iter()
456 .filter(|d| d.device_type == crate::types::presence::DeviceType::Headless)
457 .collect();
458 let mut active_devices: Vec<_> = presence
459 .devices
460 .iter()
461 .filter(|d| d.device_type == crate::types::presence::DeviceType::Active)
462 .collect();
463 let mobile_devices: Vec<_> = presence
464 .devices
465 .iter()
466 .filter(|d| d.device_type == crate::types::presence::DeviceType::Mobile)
467 .collect();
468
469 headless_devices.sort_by(|a, b| b.storage_gb.cmp(&a.storage_gb));
471 active_devices.sort_by(|a, b| b.storage_gb.cmp(&a.storage_gb));
472
473 let total_shards = data_shards + parity_shards;
478 let mut shard_idx = 0u32;
479
480 let headless_count = headless_devices.len();
482 if headless_count > 0 {
483 let min_headless_shards = (total_shards * 3).div_ceil(5); let shards_per_headless = min_headless_shards.div_ceil(headless_count);
486
487 for device in &headless_devices {
488 for _ in 0..shards_per_headless {
489 if (shard_idx as usize) < total_shards {
490 shard_map.assign_shard(device.id, shard_idx);
491 shard_idx += 1;
492 }
493 }
494 }
495 }
496
497 for device in &active_devices {
499 if (shard_idx as usize) < total_shards {
500 shard_map.assign_shard(device.id, shard_idx);
501 shard_idx += 1;
502 }
503 }
504
505 if (shard_idx as usize) < total_shards
507 && headless_devices.is_empty()
508 && active_devices.is_empty()
509 {
510 for device in &mobile_devices {
511 if (shard_idx as usize) < total_shards {
512 shard_map.assign_shard(device.id, shard_idx);
513 shard_idx += 1;
514 }
515 }
516 }
517
518 while (shard_idx as usize) < total_shards {
520 let all_devices: Vec<_> = headless_devices
521 .iter()
522 .chain(active_devices.iter())
523 .collect();
524 if all_devices.is_empty() {
525 break;
526 }
527 let device = all_devices[(shard_idx as usize) % all_devices.len()];
528 shard_map.assign_shard(device.id, shard_idx);
529 shard_idx += 1;
530 }
531
532 let mut dht = DHT.write().await;
534 dht.put(storage_id.clone(), data.clone()).await?;
535
536 let handle = StorageHandle {
538 id: storage_id,
539 size: data.len() as u64,
540 strategy: StorageStrategy::FecEncoded {
541 data_shards,
542 parity_shards,
543 shard_size: 65536,
544 },
545 shard_map,
546 sealed_key: Some(vec![0u8; 32]), };
548
549 Ok(handle)
550}
551
552pub async fn get_data(handle: &StorageHandle) -> Result<Vec<u8>> {
560 let dht = DHT.read().await;
564 let data = dht.get(&handle.id).await.context("Data not found")?;
565 Ok(data)
566}
567
568fn derive_presence_key(identity_key: Key) -> Key {
574 let mut hasher = blake3::Hasher::new();
575 hasher.update(b"presence:");
576 hasher.update(identity_key.as_bytes());
577 Key::from(*hasher.finalize().as_bytes())
578}
579
580async fn store_direct(handle: &IdentityHandle, data: Vec<u8>) -> Result<StorageHandle> {
582 let storage_id = Key::from(*blake3::hash(&data).as_bytes());
583
584 let presence = get_presence(handle.key()).await?;
586 let device = presence.devices.first().context("No devices available")?;
587 let device_id = device.id;
588
589 let mut dht = DHT.write().await;
591 dht.put(storage_id.clone(), data.clone()).await?;
592 drop(dht); let mut shard_map = crate::types::storage::ShardMap::new();
595 shard_map.assign_shard(device_id, 0);
596
597 Ok(StorageHandle {
598 id: storage_id,
599 size: data.len() as u64,
600 strategy: StorageStrategy::Direct,
601 shard_map,
602 sealed_key: Some(vec![0u8; 32]),
603 })
604}
605
606pub fn group_identity_canonical_sign_bytes(id: &Key, membership_root: &Key) -> Vec<u8> {
612 let mut out = Vec::with_capacity(16 + 32 + 32);
613 out.extend_from_slice(b"saorsa-group:identity:v1");
614 out.extend_from_slice(id.as_bytes());
615 out.extend_from_slice(membership_root.as_bytes());
616 out
617}
618
619fn compute_membership_root(members: &[MemberRef]) -> Key {
620 let mut ids: Vec<[u8; 32]> = members.iter().map(|m| *m.member_id.as_bytes()).collect();
621 ids.sort_unstable();
622 let mut hasher = blake3::Hasher::new();
623 for id in ids {
624 hasher.update(&id);
625 }
626 Key::from(*hasher.finalize().as_bytes())
627}
628
629pub fn group_identity_create(
631 words: [String; 4],
632 members: Vec<MemberRef>,
633) -> Result<(GroupIdentityPacketV1, GroupKeyPair)> {
634 if !fw_check(words.clone()) {
636 anyhow::bail!("Invalid group words");
637 }
638 let id = fw_to_key(words.clone())?;
639
640 use crate::quantum_crypto::{MlDsa65, MlDsaOperations};
642 let ml = MlDsa65::new();
643 let (group_pk, group_sk) = ml
644 .generate_keypair()
645 .map_err(|e| anyhow::anyhow!("group keypair generation failed: {e:?}"))?;
646
647 let membership_root = compute_membership_root(&members);
649 let msg = group_identity_canonical_sign_bytes(&id, &membership_root);
650 let sig = ml
651 .sign(&group_sk, &msg)
652 .map_err(|e| anyhow::anyhow!("group sign failed: {e:?}"))?;
653
654 let pkt = GroupIdentityPacketV1 {
655 v: 1,
656 words,
657 id: id.clone(),
658 group_pk: group_pk.as_bytes().to_vec(),
659 group_sig: sig.0.to_vec(),
660 members,
661 membership_root,
662 created_at: std::time::SystemTime::now()
663 .duration_since(std::time::UNIX_EPOCH)
664 .unwrap_or_default()
665 .as_secs(),
666 mls_ciphersuite: None,
667 };
668
669 Ok((pkt, GroupKeyPair { group_pk, group_sk }))
670}
671
672pub async fn group_identity_publish(packet: GroupIdentityPacketV1) -> Result<()> {
674 let root = compute_membership_root(&packet.members);
676 if root != packet.membership_root {
677 anyhow::bail!("membership_root mismatch");
678 }
679 use crate::quantum_crypto::{MlDsa65, MlDsaOperations, MlDsaPublicKey, MlDsaSignature};
681 const SIG_LEN: usize = 3309;
682 if packet.group_sig.len() != SIG_LEN {
683 anyhow::bail!("invalid signature length");
684 }
685 let mut sig_arr = [0u8; SIG_LEN];
686 sig_arr.copy_from_slice(&packet.group_sig);
687 let sig = MlDsaSignature(Box::new(sig_arr));
688 let pk = MlDsaPublicKey::from_bytes(&packet.group_pk)
689 .map_err(|_| anyhow::anyhow!("invalid group_pk"))?;
690 let ml = MlDsa65::new();
691 let msg = group_identity_canonical_sign_bytes(&packet.id, &packet.membership_root);
692 let ok = ml
693 .verify(&pk, &msg, &sig)
694 .map_err(|e| anyhow::anyhow!("verify failed: {e:?}"))?;
695 if !ok {
696 anyhow::bail!("group signature invalid");
697 }
698 dht_put_bytes(&packet.id, serde_json::to_vec(&packet)?).await
699}
700
701pub async fn group_identity_fetch(id_key: Key) -> Result<GroupIdentityPacketV1> {
703 let data = dht_get_bytes(&id_key).await.context("Group not found")?;
704 let pkt: GroupIdentityPacketV1 = serde_json::from_slice(&data)?;
705 Ok(pkt)
706}
707
708pub async fn group_identity_update_members_signed(
710 id_key: Key,
711 new_members: Vec<MemberRef>,
712 group_pk: Vec<u8>,
713 group_sig: Sig,
714) -> Result<()> {
715 let new_root = compute_membership_root(&new_members);
717 use crate::quantum_crypto::{MlDsa65, MlDsaOperations, MlDsaPublicKey, MlDsaSignature};
718 const SIG_LEN: usize = 3309;
719 let sig_bytes = group_sig.as_bytes();
720 if sig_bytes.len() != SIG_LEN {
721 anyhow::bail!("invalid signature length");
722 }
723 let mut sig_arr = [0u8; SIG_LEN];
724 sig_arr.copy_from_slice(sig_bytes);
725 let sig = MlDsaSignature(Box::new(sig_arr));
726 let pk =
727 MlDsaPublicKey::from_bytes(&group_pk).map_err(|_| anyhow::anyhow!("invalid group_pk"))?;
728 let ml = MlDsa65::new();
729 let msg = group_identity_canonical_sign_bytes(&id_key, &new_root);
730 let ok = ml
731 .verify(&pk, &msg, &sig)
732 .map_err(|e| anyhow::anyhow!("verify failed: {e:?}"))?;
733 if !ok {
734 anyhow::bail!("group signature invalid");
735 }
736
737 let mut pkt = match group_identity_fetch(id_key.clone()).await {
739 Ok(p) => p,
740 Err(_) => GroupIdentityPacketV1 {
741 v: 1,
742 words: [String::new(), String::new(), String::new(), String::new()],
743 id: id_key.clone(),
744 group_pk: group_pk.clone(),
745 group_sig: sig.0.clone().to_vec(),
746 members: Vec::new(),
747 membership_root: new_root.clone(),
748 created_at: std::time::SystemTime::now()
749 .duration_since(std::time::UNIX_EPOCH)
750 .unwrap_or_default()
751 .as_secs(),
752 mls_ciphersuite: None,
753 },
754 };
755
756 pkt.members = new_members;
757 pkt.membership_root = new_root;
758 pkt.group_pk = group_pk;
759 pkt.group_sig = sig.0.to_vec();
760
761 group_identity_publish(pkt).await
762}
763
764async fn store_replicated(
766 handle: &IdentityHandle,
767 data: Vec<u8>,
768 replicas: usize,
769) -> Result<StorageHandle> {
770 let storage_id = Key::from(*blake3::hash(&data).as_bytes());
771
772 let presence = get_presence(handle.key()).await?;
774
775 let mut shard_map = crate::types::storage::ShardMap::new();
777 for (i, device) in presence.devices.iter().take(replicas).enumerate() {
778 shard_map.assign_shard(device.id, i as u32);
779 }
780
781 let mut dht = DHT.write().await;
783 dht.put(storage_id.clone(), data.clone()).await?;
784 drop(dht); Ok(StorageHandle {
787 id: storage_id,
788 size: data.len() as u64,
789 strategy: StorageStrategy::FullReplication { replicas },
790 shard_map,
791 sealed_key: Some(vec![0u8; 32]),
792 })
793}
794