1use crate::auth::Sig;
18use crate::fwid::{compute_key, fw_check, fw_to_key, Key};
19use crate::types::{
20 Device, DeviceId, Endpoint, Identity, IdentityHandle, MlDsaKeyPair, Presence,
21 PresenceReceipt, StorageHandle, StorageStrategy,
22};
23use anyhow::{Context, Result};
24use serde::{Deserialize, Serialize};
25use std::collections::HashMap;
26use std::sync::Arc;
27use tokio::sync::RwLock;
28struct MockDht {
32 storage: HashMap<Key, Vec<u8>>,
33}
34
35impl MockDht {
36 fn new() -> Self {
37 Self {
38 storage: HashMap::new(),
39 }
40 }
41
42 async fn put(&mut self, key: Key, value: Vec<u8>) -> Result<()> {
43 self.storage.insert(key, value);
44 Ok(())
45 }
46
47 async fn get(&self, key: &Key) -> Result<Vec<u8>> {
48 self.storage
49 .get(key)
50 .cloned()
51 .ok_or_else(|| anyhow::anyhow!("Key not found"))
52 }
53}
54
55static DHT: once_cell::sync::Lazy<Arc<RwLock<MockDht>>> = once_cell::sync::Lazy::new(|| {
57 Arc::new(RwLock::new(MockDht::new()))
58});
59
60static GLOBAL_DHT_CLIENT: once_cell::sync::OnceCell<Arc<crate::dht::client::DhtClient>> =
62 once_cell::sync::OnceCell::new();
63
64pub fn set_dht_client(client: crate::dht::client::DhtClient) -> bool {
66 GLOBAL_DHT_CLIENT.set(Arc::new(client)).is_ok()
67}
68
69fn get_dht_client() -> Option<Arc<crate::dht::client::DhtClient>> {
70 GLOBAL_DHT_CLIENT.get().cloned()
71}
72
73async fn dht_put_bytes(key: &Key, value: Vec<u8>) -> Result<()> {
74 if let Some(client) = get_dht_client() {
75 let k = hex::encode(key.as_bytes());
76 let _ = client
77 .put(k, value)
78 .await
79 .context("Failed to store data in DHT client")?;
80 Ok(())
81 } else {
82 let mut dht = DHT.write().await;
83 dht.put(key.clone(), value).await
84 }
85}
86
87async fn dht_get_bytes(key: &Key) -> Result<Vec<u8>> {
88 if let Some(client) = get_dht_client() {
89 let k = hex::encode(key.as_bytes());
90 match client.get(k).await.context("DHT get failed")? {
91 Some(v) => Ok(v),
92 None => anyhow::bail!("Key not found"),
93 }
94 } else {
95 let dht = DHT.read().await;
96 dht.get(key).await
97 }
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct IdentityPacketV1 {
107 pub v: u8,
108 pub words: [String; 4],
109 pub id: Key,
110 pub pk: Vec<u8>,
111 pub sig: Option<Vec<u8>>, pub device_set_root: Key,
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct MemberRef {
118 pub member_id: Key,
119 pub member_pk: Vec<u8>,
120}
121
122#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct GroupIdentityPacketV1 {
125 pub v: u8,
126 pub words: [String; 4],
127 pub id: Key,
128 pub group_pk: Vec<u8>,
129 pub group_sig: Vec<u8>,
130 pub members: Vec<MemberRef>,
131 pub membership_root: Key,
132 pub created_at: u64,
133 pub mls_ciphersuite: Option<u16>,
134}
135
136#[derive(Clone)]
138pub struct GroupKeyPair {
139 pub group_pk: crate::quantum_crypto::MlDsaPublicKey,
140 pub group_sk: crate::quantum_crypto::MlDsaSecretKey,
141}
142
143impl std::fmt::Debug for GroupKeyPair {
144 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145 write!(
146 f,
147 "GroupKeyPair {{ group_pk: <{} bytes>, group_sk: <hidden> }}",
148 self.group_pk.as_bytes().len()
149 )
150 }
151}
152
153pub async fn register_identity(
166 words: [&str; 4],
167 keypair: &MlDsaKeyPair,
168) -> Result<IdentityHandle> {
169 let words_owned: [String; 4] = [
171 words[0].to_string(),
172 words[1].to_string(),
173 words[2].to_string(),
174 words[3].to_string(),
175 ];
176
177 if !fw_check(words_owned.clone()) {
179 anyhow::bail!("Invalid word in identity");
180 }
181
182 let key = fw_to_key(words_owned.clone())?;
184
185 let dht = DHT.read().await;
187 if dht.get(&key).await.is_ok() {
188 anyhow::bail!("Identity already registered");
189 }
190 drop(dht);
191
192 let identity = Identity {
194 words: words_owned.clone(),
195 key: key.clone(),
196 public_key: keypair.public_key.clone(),
197 };
198
199 let packet = IdentityPacketV1 {
200 v: 1,
201 words: words_owned.clone(),
202 id: key.clone(),
203 pk: keypair.public_key.clone(),
204 sig: None,
205 device_set_root: compute_key("device-set", key.as_bytes()),
206 };
207
208 dht_put_bytes(&key, serde_json::to_vec(&packet)?).await?;
209
210 Ok(IdentityHandle::new(identity, keypair.clone()))
211}
212
213pub async fn get_identity(key: Key) -> Result<Identity> {
221 let data = dht_get_bytes(&key).await.context("Identity not found")?;
223 if let Ok(pkt) = serde_json::from_slice::<IdentityPacketV1>(&data) {
224 let identity = Identity {
225 words: pkt.words,
226 key: pkt.id,
227 public_key: pkt.pk,
228 };
229 return Ok(identity);
230 }
231 let identity: Identity = serde_json::from_slice(&data)?;
233 Ok(identity)
234}
235
236pub async fn identity_fetch(key: Key) -> Result<IdentityPacketV1> {
238 let data = dht_get_bytes(&key).await.context("Identity not found")?;
239 let pkt: IdentityPacketV1 = serde_json::from_slice(&data)?;
240 Ok(pkt)
241}
242
243pub async fn register_presence(
257 handle: &IdentityHandle,
258 devices: Vec<Device>,
259 active_device: DeviceId,
260) -> Result<PresenceReceipt> {
261 if !devices.iter().any(|d| d.id == active_device) {
263 anyhow::bail!("Active device not in device list");
264 }
265
266 let presence = Presence {
268 identity: handle.key(),
269 devices,
270 active_device: Some(active_device),
271 timestamp: std::time::SystemTime::now()
272 .duration_since(std::time::UNIX_EPOCH)?
273 .as_secs(),
274 signature: vec![], };
276
277 let presence_bytes = serde_json::to_vec(&presence)?;
279 let signature = handle.sign(&presence_bytes)?;
280
281 let mut signed_presence = presence;
282 signed_presence.signature = signature;
283
284 let presence_key = derive_presence_key(handle.key());
286 let mut dht = DHT.write().await;
287 dht.put(presence_key, serde_json::to_vec(&signed_presence)?).await?;
288
289 let receipt = PresenceReceipt {
291 identity: handle.key(),
292 timestamp: signed_presence.timestamp,
293 storing_nodes: vec![Key::from([0u8; 32])], };
295
296 Ok(receipt)
297}
298
299pub async fn get_presence(identity_key: Key) -> Result<Presence> {
307 let presence_key = derive_presence_key(identity_key);
308 let dht = DHT.read().await;
309 let data = dht.get(&presence_key).await.context("Presence not found")?;
310 let presence: Presence = serde_json::from_slice(&data)?;
311 Ok(presence)
312}
313
314pub async fn register_headless(
324 handle: &IdentityHandle,
325 storage_gb: u32,
326 endpoint: Endpoint,
327) -> Result<DeviceId> {
328 let mut presence = get_presence(handle.key()).await?;
330
331 let device = Device {
333 id: DeviceId::generate(),
334 device_type: crate::types::presence::DeviceType::Headless,
335 storage_gb: storage_gb as u64,
336 endpoint,
337 capabilities: crate::types::presence::DeviceCapabilities {
338 storage_bytes: storage_gb as u64 * 1_000_000_000,
339 always_online: true,
340 supports_fec: true,
341 supports_seal: true,
342 ..Default::default()
343 },
344 };
345
346 let device_id = device.id;
347 presence.devices.push(device);
348
349 let active = presence.active_device.unwrap_or(device_id);
351 register_presence(handle, presence.devices, active).await?;
352
353 Ok(device_id)
354}
355
356pub async fn set_active_device(
362 handle: &IdentityHandle,
363 device_id: DeviceId,
364) -> Result<()> {
365 let presence = get_presence(handle.key()).await?;
367
368 if !presence.devices.iter().any(|d| d.id == device_id) {
370 anyhow::bail!("Device not found in presence");
371 }
372
373 register_presence(handle, presence.devices, device_id).await?;
375 Ok(())
376}
377
378pub async fn store_data(
392 handle: &IdentityHandle,
393 data: Vec<u8>,
394 group_size: usize,
395) -> Result<StorageHandle> {
396 let strategy = StorageStrategy::from_group_size(group_size);
398
399 match strategy {
400 StorageStrategy::Direct => store_direct(handle, data).await,
401 StorageStrategy::FullReplication { replicas } => {
402 store_replicated(handle, data, replicas).await
403 }
404 StorageStrategy::FecEncoded { data_shards, parity_shards, .. } => {
405 store_with_fec(handle, data, data_shards, parity_shards).await
406 }
407 }
408}
409
410pub async fn store_dyad(
420 handle1: &IdentityHandle,
421 _handle2_key: Key,
422 data: Vec<u8>,
423) -> Result<StorageHandle> {
424 store_replicated(handle1, data, 2).await
426}
427
428pub async fn store_with_fec(
439 handle: &IdentityHandle,
440 data: Vec<u8>,
441 data_shards: usize,
442 parity_shards: usize,
443) -> Result<StorageHandle> {
444 let storage_id = Key::from(*blake3::hash(&data).as_bytes());
446
447 let mut shard_map = crate::types::storage::ShardMap::new();
452
453 let presence = get_presence(handle.key()).await?;
455
456 let mut devices = presence.devices.clone();
458 devices.sort_by_key(|d| match d.device_type {
459 crate::types::presence::DeviceType::Headless => 0,
460 crate::types::presence::DeviceType::Active => 1,
461 crate::types::presence::DeviceType::Mobile => 2,
462 });
463
464 let total_shards = data_shards + parity_shards;
466 for (i, device) in devices.iter().take(total_shards).enumerate() {
467 shard_map.assign_shard(device.id, i as u32);
468 }
469
470 let mut dht = DHT.write().await;
472 dht.put(storage_id.clone(), data.clone()).await?;
473
474 let handle = StorageHandle {
476 id: storage_id,
477 size: data.len() as u64,
478 strategy: StorageStrategy::FecEncoded {
479 data_shards,
480 parity_shards,
481 shard_size: 65536,
482 },
483 shard_map,
484 sealed_key: Some(vec![0u8; 32]), };
486
487 Ok(handle)
488}
489
490pub async fn get_data(handle: &StorageHandle) -> Result<Vec<u8>> {
498 let dht = DHT.read().await;
502 let data = dht.get(&handle.id).await.context("Data not found")?;
503 Ok(data)
504}
505
506fn derive_presence_key(identity_key: Key) -> Key {
512 let mut hasher = blake3::Hasher::new();
513 hasher.update(b"presence:");
514 hasher.update(identity_key.as_bytes());
515 Key::from(*hasher.finalize().as_bytes())
516}
517
518async fn store_direct(handle: &IdentityHandle, data: Vec<u8>) -> Result<StorageHandle> {
520 let storage_id = Key::from(*blake3::hash(&data).as_bytes());
521
522 let mut dht = DHT.write().await;
524 dht.put(storage_id.clone(), data.clone()).await?;
525
526 let presence = get_presence(handle.key()).await?;
528 let device = presence.devices.first().context("No devices available")?;
529
530 let mut shard_map = crate::types::storage::ShardMap::new();
531 shard_map.assign_shard(device.id, 0);
532
533 Ok(StorageHandle {
534 id: storage_id,
535 size: data.len() as u64,
536 strategy: StorageStrategy::Direct,
537 shard_map,
538 sealed_key: Some(vec![0u8; 32]),
539 })
540}
541
542pub fn group_identity_canonical_sign_bytes(id: &Key, membership_root: &Key) -> Vec<u8> {
548 let mut out = Vec::with_capacity(16 + 32 + 32);
549 out.extend_from_slice(b"saorsa-group:identity:v1");
550 out.extend_from_slice(id.as_bytes());
551 out.extend_from_slice(membership_root.as_bytes());
552 out
553}
554
555fn compute_membership_root(members: &[MemberRef]) -> Key {
556 let mut ids: Vec<[u8; 32]> = members.iter().map(|m| *m.member_id.as_bytes()).collect();
557 ids.sort_unstable();
558 let mut hasher = blake3::Hasher::new();
559 for id in ids {
560 hasher.update(&id);
561 }
562 Key::from(*hasher.finalize().as_bytes())
563}
564
565pub fn group_identity_create(
567 words: [String; 4],
568 members: Vec<MemberRef>,
569) -> Result<(GroupIdentityPacketV1, GroupKeyPair)> {
570 if !fw_check(words.clone()) {
572 anyhow::bail!("Invalid group words");
573 }
574 let id = fw_to_key(words.clone())?;
575
576 use crate::quantum_crypto::{MlDsa65, MlDsaOperations};
578 let ml = MlDsa65::new();
579 let (group_pk, group_sk) = ml
580 .generate_keypair()
581 .map_err(|e| anyhow::anyhow!("group keypair generation failed: {e:?}"))?;
582
583 let membership_root = compute_membership_root(&members);
585 let msg = group_identity_canonical_sign_bytes(&id, &membership_root);
586 let sig = ml
587 .sign(&group_sk, &msg)
588 .map_err(|e| anyhow::anyhow!("group sign failed: {e:?}"))?;
589
590 let pkt = GroupIdentityPacketV1 {
591 v: 1,
592 words,
593 id: id.clone(),
594 group_pk: group_pk.as_bytes().to_vec(),
595 group_sig: sig.0.to_vec(),
596 members,
597 membership_root,
598 created_at: std::time::SystemTime::now()
599 .duration_since(std::time::UNIX_EPOCH)
600 .unwrap_or_default()
601 .as_secs(),
602 mls_ciphersuite: None,
603 };
604
605 Ok((pkt, GroupKeyPair { group_pk, group_sk }))
606}
607
608pub async fn group_identity_publish(packet: GroupIdentityPacketV1) -> Result<()> {
610 let root = compute_membership_root(&packet.members);
612 if root != packet.membership_root {
613 anyhow::bail!("membership_root mismatch");
614 }
615 use crate::quantum_crypto::{MlDsa65, MlDsaOperations, MlDsaPublicKey, MlDsaSignature};
617 const SIG_LEN: usize = 3309;
618 if packet.group_sig.len() != SIG_LEN {
619 anyhow::bail!("invalid signature length");
620 }
621 let mut sig_arr = [0u8; SIG_LEN];
622 sig_arr.copy_from_slice(&packet.group_sig);
623 let sig = MlDsaSignature(Box::new(sig_arr));
624 let pk = MlDsaPublicKey::from_bytes(&packet.group_pk)
625 .map_err(|_| anyhow::anyhow!("invalid group_pk"))?;
626 let ml = MlDsa65::new();
627 let msg = group_identity_canonical_sign_bytes(&packet.id, &packet.membership_root);
628 let ok = ml
629 .verify(&pk, &msg, &sig)
630 .map_err(|e| anyhow::anyhow!("verify failed: {e:?}"))?;
631 if !ok {
632 anyhow::bail!("group signature invalid");
633 }
634 dht_put_bytes(&packet.id, serde_json::to_vec(&packet)?).await
635}
636
637pub async fn group_identity_fetch(id_key: Key) -> Result<GroupIdentityPacketV1> {
639 let data = dht_get_bytes(&id_key).await.context("Group not found")?;
640 let pkt: GroupIdentityPacketV1 = serde_json::from_slice(&data)?;
641 Ok(pkt)
642}
643
644pub async fn group_identity_update_members_signed(
646 id_key: Key,
647 new_members: Vec<MemberRef>,
648 group_pk: Vec<u8>,
649 group_sig: Sig,
650) -> Result<()> {
651 let new_root = compute_membership_root(&new_members);
653 use crate::quantum_crypto::{MlDsa65, MlDsaOperations, MlDsaPublicKey, MlDsaSignature};
654 const SIG_LEN: usize = 3309;
655 let sig_bytes = group_sig.as_bytes();
656 if sig_bytes.len() != SIG_LEN {
657 anyhow::bail!("invalid signature length");
658 }
659 let mut sig_arr = [0u8; SIG_LEN];
660 sig_arr.copy_from_slice(sig_bytes);
661 let sig = MlDsaSignature(Box::new(sig_arr));
662 let pk = MlDsaPublicKey::from_bytes(&group_pk)
663 .map_err(|_| anyhow::anyhow!("invalid group_pk"))?;
664 let ml = MlDsa65::new();
665 let msg = group_identity_canonical_sign_bytes(&id_key, &new_root);
666 let ok = ml
667 .verify(&pk, &msg, &sig)
668 .map_err(|e| anyhow::anyhow!("verify failed: {e:?}"))?;
669 if !ok {
670 anyhow::bail!("group signature invalid");
671 }
672
673 let mut pkt = match group_identity_fetch(id_key.clone()).await {
675 Ok(p) => p,
676 Err(_) => GroupIdentityPacketV1 {
677 v: 1,
678 words: [String::new(), String::new(), String::new(), String::new()],
679 id: id_key.clone(),
680 group_pk: group_pk.clone(),
681 group_sig: sig.0.clone().to_vec(),
682 members: Vec::new(),
683 membership_root: new_root.clone(),
684 created_at: std::time::SystemTime::now()
685 .duration_since(std::time::UNIX_EPOCH)
686 .unwrap_or_default()
687 .as_secs(),
688 mls_ciphersuite: None,
689 },
690 };
691
692 pkt.members = new_members;
693 pkt.membership_root = new_root;
694 pkt.group_pk = group_pk;
695 pkt.group_sig = sig.0.to_vec();
696
697 group_identity_publish(pkt).await
698}
699
700async fn store_replicated(
702 handle: &IdentityHandle,
703 data: Vec<u8>,
704 replicas: usize,
705) -> Result<StorageHandle> {
706 let storage_id = Key::from(*blake3::hash(&data).as_bytes());
707
708 let mut dht = DHT.write().await;
710 dht.put(storage_id.clone(), data.clone()).await?;
711
712 let presence = get_presence(handle.key()).await?;
714 let mut shard_map = crate::types::storage::ShardMap::new();
715
716 for (i, device) in presence.devices.iter().take(replicas).enumerate() {
717 shard_map.assign_shard(device.id, i as u32);
718 }
719
720 Ok(StorageHandle {
721 id: storage_id,
722 size: data.len() as u64,
723 strategy: StorageStrategy::FullReplication { replicas },
724 shard_map,
725 sealed_key: Some(vec![0u8; 32]),
726 })
727}