saorsa_core/
api.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: saorsalabs@gmail.com
9
10//! Clean API implementation for saorsa-core
11//!
12//! This module provides the simplified public API for:
13//! - Identity registration and management
14//! - Presence and device management
15//! - Storage with saorsa-seal and saorsa-fec
16
17use crate::auth::Sig;
18use crate::fwid::{Key, compute_key, fw_check, fw_to_key};
19use crate::types::{
20    Device, DeviceId, Endpoint, Identity, IdentityHandle, MlDsaKeyPair, Presence, PresenceReceipt,
21    StorageHandle, StorageStrategy,
22};
23use anyhow::{Context, Result};
24use serde::{Deserialize, Serialize};
25use std::collections::HashMap;
26use std::sync::Arc;
27use tokio::sync::RwLock;
28// tracing not currently used in this module
29
30// Mock DHT for fallback when no global DHT client is installed
31struct MockDht {
32    storage: HashMap<Key, Vec<u8>>,
33}
34
35impl MockDht {
36    fn new() -> Self {
37        Self {
38            storage: HashMap::new(),
39        }
40    }
41
42    async fn put(&mut self, key: Key, value: Vec<u8>) -> Result<()> {
43        self.storage.insert(key, value);
44        Ok(())
45    }
46
47    async fn get(&self, key: &Key) -> Result<Vec<u8>> {
48        self.storage
49            .get(key)
50            .cloned()
51            .ok_or_else(|| anyhow::anyhow!("Key not found"))
52    }
53}
54
55// Global DHT instance for testing
56static DHT: once_cell::sync::Lazy<Arc<RwLock<MockDht>>> =
57    once_cell::sync::Lazy::new(|| Arc::new(RwLock::new(MockDht::new())));
58
59// Optional global DHT client (real engine). If not set, we fall back to MockDht.
60static GLOBAL_DHT_CLIENT: once_cell::sync::OnceCell<Arc<crate::dht::client::DhtClient>> =
61    once_cell::sync::OnceCell::new();
62
63/// Install a process-global DHT client for API operations.
64pub fn set_dht_client(client: crate::dht::client::DhtClient) -> bool {
65    GLOBAL_DHT_CLIENT.set(Arc::new(client)).is_ok()
66}
67
68fn get_dht_client() -> Option<Arc<crate::dht::client::DhtClient>> {
69    GLOBAL_DHT_CLIENT.get().cloned()
70}
71
72async fn dht_put_bytes(key: &Key, value: Vec<u8>) -> Result<()> {
73    if let Some(client) = get_dht_client() {
74        let k = hex::encode(key.as_bytes());
75        let _ = client
76            .put(k, value)
77            .await
78            .context("Failed to store data in DHT client")?;
79        Ok(())
80    } else {
81        let mut dht = DHT.write().await;
82        dht.put(key.clone(), value).await
83    }
84}
85
86async fn dht_get_bytes(key: &Key) -> Result<Vec<u8>> {
87    if let Some(client) = get_dht_client() {
88        let k = hex::encode(key.as_bytes());
89        match client.get(k).await.context("DHT get failed")? {
90            Some(v) => Ok(v),
91            None => anyhow::bail!("Key not found"),
92        }
93    } else {
94        let dht = DHT.read().await;
95        dht.get(key).await
96    }
97}
98
99// =============================================================================
100// API-visible record types (minimal, per AGENTS_API.md)
101// =============================================================================
102
103/// Minimal identity packet compatible with Communitas group flows
104#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct IdentityPacketV1 {
106    pub v: u8,
107    pub words: [String; 4],
108    pub id: Key,
109    pub pk: Vec<u8>,
110    pub sig: Option<Vec<u8>>, // optional when registered locally
111    pub device_set_root: Key,
112}
113
114/// Member reference for group identities
115#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct MemberRef {
117    pub member_id: Key,
118    pub member_pk: Vec<u8>,
119}
120
121/// Group identity packet (canonical)
122#[derive(Debug, Clone, Serialize, Deserialize)]
123pub struct GroupIdentityPacketV1 {
124    pub v: u8,
125    pub words: [String; 4],
126    pub id: Key,
127    pub group_pk: Vec<u8>,
128    pub group_sig: Vec<u8>,
129    pub members: Vec<MemberRef>,
130    pub membership_root: Key,
131    pub created_at: u64,
132    pub mls_ciphersuite: Option<u16>,
133}
134
135/// Keypair for group signatures
136#[derive(Clone)]
137pub struct GroupKeyPair {
138    pub group_pk: crate::quantum_crypto::MlDsaPublicKey,
139    pub group_sk: crate::quantum_crypto::MlDsaSecretKey,
140}
141
142impl std::fmt::Debug for GroupKeyPair {
143    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144        write!(
145            f,
146            "GroupKeyPair {{ group_pk: <{} bytes>, group_sk: <hidden> }}",
147            self.group_pk.as_bytes().len()
148        )
149    }
150}
151
152// ============================================================================
153// IDENTITY API
154// ============================================================================
155
156/// Register a new identity on the network
157///
158/// # Arguments
159/// * `words` - Four-word identifier (must be valid dictionary words)
160/// * `keypair` - ML-DSA keypair for signing
161///
162/// # Returns
163/// * `IdentityHandle` - Handle for identity operations
164pub async fn register_identity(words: [&str; 4], keypair: &MlDsaKeyPair) -> Result<IdentityHandle> {
165    // Convert to owned strings
166    let words_owned: [String; 4] = [
167        words[0].to_string(),
168        words[1].to_string(),
169        words[2].to_string(),
170        words[3].to_string(),
171    ];
172
173    // Validate words
174    if !fw_check(words_owned.clone()) {
175        anyhow::bail!("Invalid word in identity");
176    }
177
178    // Generate key from words
179    let key = fw_to_key(words_owned.clone())?;
180
181    // Check if already registered
182    let dht = DHT.read().await;
183    if dht.get(&key).await.is_ok() {
184        anyhow::bail!("Identity already registered");
185    }
186    drop(dht);
187
188    // Create identity (typed) and store packet for compatibility
189    let identity = Identity {
190        words: words_owned.clone(),
191        key: key.clone(),
192        public_key: keypair.public_key.clone(),
193    };
194
195    let packet = IdentityPacketV1 {
196        v: 1,
197        words: words_owned.clone(),
198        id: key.clone(),
199        pk: keypair.public_key.clone(),
200        sig: None,
201        device_set_root: compute_key("device-set", key.as_bytes()),
202    };
203
204    dht_put_bytes(&key, serde_json::to_vec(&packet)?).await?;
205
206    Ok(IdentityHandle::new(identity, keypair.clone()))
207}
208
209/// Get an identity by its key
210///
211/// # Arguments
212/// * `key` - Identity key (derived from four-word address)
213///
214/// # Returns
215/// * `Identity` - The identity information
216pub async fn get_identity(key: Key) -> Result<Identity> {
217    // Try to read the identity packet and map back to Identity struct
218    let data = dht_get_bytes(&key).await.context("Identity not found")?;
219    if let Ok(pkt) = serde_json::from_slice::<IdentityPacketV1>(&data) {
220        let identity = Identity {
221            words: pkt.words,
222            key: pkt.id,
223            public_key: pkt.pk,
224        };
225        return Ok(identity);
226    }
227    // Fallback: legacy storage of Identity
228    let identity: Identity = serde_json::from_slice(&data)?;
229    Ok(identity)
230}
231
232/// Fetch identity packet in canonical format
233pub async fn identity_fetch(key: Key) -> Result<IdentityPacketV1> {
234    let data = dht_get_bytes(&key).await.context("Identity not found")?;
235    let pkt: IdentityPacketV1 = serde_json::from_slice(&data)?;
236    Ok(pkt)
237}
238
239// ============================================================================
240// PRESENCE API
241// ============================================================================
242
243/// Register presence on the network
244///
245/// # Arguments
246/// * `handle` - Identity handle
247/// * `devices` - List of devices for this identity
248/// * `active_device` - Currently active device ID
249///
250/// # Returns
251/// * `PresenceReceipt` - Receipt of presence registration
252pub async fn register_presence(
253    handle: &IdentityHandle,
254    devices: Vec<Device>,
255    active_device: DeviceId,
256) -> Result<PresenceReceipt> {
257    // Validate active device is in list
258    if !devices.iter().any(|d| d.id == active_device) {
259        anyhow::bail!("Active device not in device list");
260    }
261
262    // Create presence packet
263    let presence = Presence {
264        identity: handle.key(),
265        devices,
266        active_device: Some(active_device),
267        timestamp: std::time::SystemTime::now()
268            .duration_since(std::time::UNIX_EPOCH)?
269            .as_secs(),
270        signature: vec![], // Will be filled
271    };
272
273    // Sign presence
274    let presence_bytes = serde_json::to_vec(&presence)?;
275    let signature = handle.sign(&presence_bytes)?;
276
277    let mut signed_presence = presence;
278    signed_presence.signature = signature;
279
280    // Store in DHT with presence key
281    let presence_key = derive_presence_key(handle.key());
282    let mut dht = DHT.write().await;
283    dht.put(presence_key, serde_json::to_vec(&signed_presence)?)
284        .await?;
285
286    // Create receipt
287    let receipt = PresenceReceipt {
288        identity: handle.key(),
289        timestamp: signed_presence.timestamp,
290        storing_nodes: vec![Key::from([0u8; 32])], // Mock node
291    };
292
293    Ok(receipt)
294}
295
296/// Get presence information for an identity
297///
298/// # Arguments
299/// * `identity_key` - Key of the identity
300///
301/// # Returns
302/// * `Presence` - Current presence information
303pub async fn get_presence(identity_key: Key) -> Result<Presence> {
304    let presence_key = derive_presence_key(identity_key);
305    let dht = DHT.read().await;
306    let data = dht.get(&presence_key).await.context("Presence not found")?;
307    let presence: Presence = serde_json::from_slice(&data)?;
308    Ok(presence)
309}
310
311/// Register a headless storage node
312///
313/// # Arguments
314/// * `handle` - Identity handle
315/// * `storage_gb` - Storage capacity in GB
316/// * `endpoint` - Network endpoint
317///
318/// # Returns
319/// * `DeviceId` - ID of the registered headless node
320pub async fn register_headless(
321    handle: &IdentityHandle,
322    storage_gb: u32,
323    endpoint: Endpoint,
324) -> Result<DeviceId> {
325    // Get current presence
326    let mut presence = get_presence(handle.key()).await?;
327
328    // Create headless device
329    let device = Device {
330        id: DeviceId::generate(),
331        device_type: crate::types::presence::DeviceType::Headless,
332        storage_gb: storage_gb as u64,
333        endpoint,
334        capabilities: crate::types::presence::DeviceCapabilities {
335            storage_bytes: storage_gb as u64 * 1_000_000_000,
336            always_online: true,
337            supports_fec: true,
338            supports_seal: true,
339            ..Default::default()
340        },
341    };
342
343    let device_id = device.id;
344    presence.devices.push(device);
345
346    // Update presence
347    let active = presence.active_device.unwrap_or(device_id);
348    register_presence(handle, presence.devices, active).await?;
349
350    Ok(device_id)
351}
352
353/// Set the active device for an identity
354///
355/// # Arguments
356/// * `handle` - Identity handle
357/// * `device_id` - Device to make active
358pub async fn set_active_device(handle: &IdentityHandle, device_id: DeviceId) -> Result<()> {
359    // Get current presence
360    let presence = get_presence(handle.key()).await?;
361
362    // Validate device exists
363    if !presence.devices.iter().any(|d| d.id == device_id) {
364        anyhow::bail!("Device not found in presence");
365    }
366
367    // Update with new active device
368    register_presence(handle, presence.devices, device_id).await?;
369    Ok(())
370}
371
372// ============================================================================
373// STORAGE API
374// ============================================================================
375
376/// Store data on the network
377///
378/// # Arguments
379/// * `handle` - Identity handle
380/// * `data` - Data to store
381/// * `group_size` - Size of the group (affects storage strategy)
382///
383/// # Returns
384/// * `StorageHandle` - Handle to retrieve the data
385pub async fn store_data(
386    handle: &IdentityHandle,
387    data: Vec<u8>,
388    group_size: usize,
389) -> Result<StorageHandle> {
390    // Select strategy based on group size
391    let strategy = StorageStrategy::from_group_size(group_size);
392
393    match strategy {
394        StorageStrategy::Direct => store_direct(handle, data).await,
395        StorageStrategy::FullReplication { replicas } => {
396            store_replicated(handle, data, replicas).await
397        }
398        StorageStrategy::FecEncoded {
399            data_shards,
400            parity_shards,
401            ..
402        } => store_with_fec(handle, data, data_shards, parity_shards).await,
403    }
404}
405
406/// Store data for a dyad (2-person group)
407///
408/// # Arguments
409/// * `handle1` - First identity handle
410/// * `handle2_key` - Key of second identity
411/// * `data` - Data to store
412///
413/// # Returns
414/// * `StorageHandle` - Handle to retrieve the data
415pub async fn store_dyad(
416    handle1: &IdentityHandle,
417    _handle2_key: Key,
418    data: Vec<u8>,
419) -> Result<StorageHandle> {
420    // For dyads, use full replication (2 copies)
421    store_replicated(handle1, data, 2).await
422}
423
424/// Store data with custom FEC parameters
425///
426/// # Arguments
427/// * `handle` - Identity handle
428/// * `data` - Data to store
429/// * `data_shards` - Number of data shards (k)
430/// * `parity_shards` - Number of parity shards (m)
431///
432/// # Returns
433/// * `StorageHandle` - Handle to retrieve the data
434pub async fn store_with_fec(
435    handle: &IdentityHandle,
436    data: Vec<u8>,
437    data_shards: usize,
438    parity_shards: usize,
439) -> Result<StorageHandle> {
440    // Generate storage ID
441    let storage_id = Key::from(*blake3::hash(&data).as_bytes());
442
443    // TODO: Actual FEC encoding with saorsa-fec
444    // For now, just store the data directly
445
446    // Create shard map (mock)
447    let mut shard_map = crate::types::storage::ShardMap::new();
448
449    // Get presence to find devices
450    let presence = get_presence(handle.key()).await?;
451
452    // Separate devices by type for weighted distribution
453    let mut headless_devices: Vec<_> = presence
454        .devices
455        .iter()
456        .filter(|d| d.device_type == crate::types::presence::DeviceType::Headless)
457        .collect();
458    let mut active_devices: Vec<_> = presence
459        .devices
460        .iter()
461        .filter(|d| d.device_type == crate::types::presence::DeviceType::Active)
462        .collect();
463    let mobile_devices: Vec<_> = presence
464        .devices
465        .iter()
466        .filter(|d| d.device_type == crate::types::presence::DeviceType::Mobile)
467        .collect();
468
469    // Sort each category by storage capacity (descending)
470    headless_devices.sort_by(|a, b| b.storage_gb.cmp(&a.storage_gb));
471    active_devices.sort_by(|a, b| b.storage_gb.cmp(&a.storage_gb));
472
473    // Assign shards with weighted distribution:
474    // - Headless devices: get most shards (prefer always-online, high capacity)
475    // - Active devices: get some shards
476    // - Mobile devices: minimal/no shards (unreliable, low capacity)
477    let total_shards = data_shards + parity_shards;
478    let mut shard_idx = 0u32;
479
480    // Headless devices get priority - assign multiple shards if needed
481    let headless_count = headless_devices.len();
482    if headless_count > 0 {
483        // Headless devices should get at least 60% of shards
484        let min_headless_shards = (total_shards * 3).div_ceil(5); // ceil(60%)
485        let shards_per_headless = min_headless_shards.div_ceil(headless_count);
486
487        for device in &headless_devices {
488            for _ in 0..shards_per_headless {
489                if (shard_idx as usize) < total_shards {
490                    shard_map.assign_shard(device.id, shard_idx);
491                    shard_idx += 1;
492                }
493            }
494        }
495    }
496
497    // Active devices get remaining shards (excluding mobile allocation)
498    for device in &active_devices {
499        if (shard_idx as usize) < total_shards {
500            shard_map.assign_shard(device.id, shard_idx);
501            shard_idx += 1;
502        }
503    }
504
505    // Mobile devices only get shards if we have no other option
506    if (shard_idx as usize) < total_shards
507        && headless_devices.is_empty()
508        && active_devices.is_empty()
509    {
510        for device in &mobile_devices {
511            if (shard_idx as usize) < total_shards {
512                shard_map.assign_shard(device.id, shard_idx);
513                shard_idx += 1;
514            }
515        }
516    }
517
518    // If we still don't have enough, cycle through available devices
519    while (shard_idx as usize) < total_shards {
520        let all_devices: Vec<_> = headless_devices
521            .iter()
522            .chain(active_devices.iter())
523            .collect();
524        if all_devices.is_empty() {
525            break;
526        }
527        let device = all_devices[(shard_idx as usize) % all_devices.len()];
528        shard_map.assign_shard(device.id, shard_idx);
529        shard_idx += 1;
530    }
531
532    // Store data in DHT
533    let mut dht = DHT.write().await;
534    dht.put(storage_id.clone(), data.clone()).await?;
535
536    // Create storage handle
537    let handle = StorageHandle {
538        id: storage_id,
539        size: data.len() as u64,
540        strategy: StorageStrategy::FecEncoded {
541            data_shards,
542            parity_shards,
543            shard_size: 65536,
544        },
545        shard_map,
546        sealed_key: Some(vec![0u8; 32]), // Mock sealed key
547    };
548
549    Ok(handle)
550}
551
552/// Retrieve data from the network
553///
554/// # Arguments
555/// * `handle` - Storage handle
556///
557/// # Returns
558/// * `Vec<u8>` - The retrieved data
559pub async fn get_data(handle: &StorageHandle) -> Result<Vec<u8>> {
560    // TODO: Handle different strategies (FEC decoding, unsealing, etc.)
561    // For now, just retrieve from DHT
562
563    let dht = DHT.read().await;
564    let data = dht.get(&handle.id).await.context("Data not found")?;
565    Ok(data)
566}
567
568// ============================================================================
569// HELPER FUNCTIONS
570// ============================================================================
571
572/// Derive presence key from identity key
573fn derive_presence_key(identity_key: Key) -> Key {
574    let mut hasher = blake3::Hasher::new();
575    hasher.update(b"presence:");
576    hasher.update(identity_key.as_bytes());
577    Key::from(*hasher.finalize().as_bytes())
578}
579
580/// Store data directly (no redundancy)
581async fn store_direct(handle: &IdentityHandle, data: Vec<u8>) -> Result<StorageHandle> {
582    let storage_id = Key::from(*blake3::hash(&data).as_bytes());
583
584    // Get presence BEFORE acquiring DHT write lock to avoid deadlock
585    let presence = get_presence(handle.key()).await?;
586    let device = presence.devices.first().context("No devices available")?;
587    let device_id = device.id;
588
589    // Store in DHT
590    let mut dht = DHT.write().await;
591    dht.put(storage_id.clone(), data.clone()).await?;
592    drop(dht); // Release lock early
593
594    let mut shard_map = crate::types::storage::ShardMap::new();
595    shard_map.assign_shard(device_id, 0);
596
597    Ok(StorageHandle {
598        id: storage_id,
599        size: data.len() as u64,
600        strategy: StorageStrategy::Direct,
601        shard_map,
602        sealed_key: Some(vec![0u8; 32]),
603    })
604}
605
606// ============================================================================
607// GROUP API (per AGENTS_API.md, minimal subset used by Communitas)
608// ============================================================================
609
610/// Canonical bytes for group identity signing: b"saorsa-group:identity:v1" || id || membership_root
611pub fn group_identity_canonical_sign_bytes(id: &Key, membership_root: &Key) -> Vec<u8> {
612    let mut out = Vec::with_capacity(16 + 32 + 32);
613    out.extend_from_slice(b"saorsa-group:identity:v1");
614    out.extend_from_slice(id.as_bytes());
615    out.extend_from_slice(membership_root.as_bytes());
616    out
617}
618
619fn compute_membership_root(members: &[MemberRef]) -> Key {
620    let mut ids: Vec<[u8; 32]> = members.iter().map(|m| *m.member_id.as_bytes()).collect();
621    ids.sort_unstable();
622    let mut hasher = blake3::Hasher::new();
623    for id in ids {
624        hasher.update(&id);
625    }
626    Key::from(*hasher.finalize().as_bytes())
627}
628
629/// Create a canonical group identity and keypair
630pub fn group_identity_create(
631    words: [String; 4],
632    members: Vec<MemberRef>,
633) -> Result<(GroupIdentityPacketV1, GroupKeyPair)> {
634    // Validate words and id
635    if !fw_check(words.clone()) {
636        anyhow::bail!("Invalid group words");
637    }
638    let id = fw_to_key(words.clone())?;
639
640    // Generate ML-DSA group keypair
641    use crate::quantum_crypto::{MlDsa65, MlDsaOperations};
642    let ml = MlDsa65::new();
643    let (group_pk, group_sk) = ml
644        .generate_keypair()
645        .map_err(|e| anyhow::anyhow!("group keypair generation failed: {e:?}"))?;
646
647    // Compute membership root and sign canonical bytes
648    let membership_root = compute_membership_root(&members);
649    let msg = group_identity_canonical_sign_bytes(&id, &membership_root);
650    let sig = ml
651        .sign(&group_sk, &msg)
652        .map_err(|e| anyhow::anyhow!("group sign failed: {e:?}"))?;
653
654    let pkt = GroupIdentityPacketV1 {
655        v: 1,
656        words,
657        id: id.clone(),
658        group_pk: group_pk.as_bytes().to_vec(),
659        group_sig: sig.0.to_vec(),
660        members,
661        membership_root,
662        created_at: std::time::SystemTime::now()
663            .duration_since(std::time::UNIX_EPOCH)
664            .unwrap_or_default()
665            .as_secs(),
666        mls_ciphersuite: None,
667    };
668
669    Ok((pkt, GroupKeyPair { group_pk, group_sk }))
670}
671
672/// Publish a group identity packet under its id key
673pub async fn group_identity_publish(packet: GroupIdentityPacketV1) -> Result<()> {
674    // Basic validation: recompute root and signature check
675    let root = compute_membership_root(&packet.members);
676    if root != packet.membership_root {
677        anyhow::bail!("membership_root mismatch");
678    }
679    // Verify signature
680    use crate::quantum_crypto::{MlDsa65, MlDsaOperations, MlDsaPublicKey, MlDsaSignature};
681    const SIG_LEN: usize = 3309;
682    if packet.group_sig.len() != SIG_LEN {
683        anyhow::bail!("invalid signature length");
684    }
685    let mut sig_arr = [0u8; SIG_LEN];
686    sig_arr.copy_from_slice(&packet.group_sig);
687    let sig = MlDsaSignature(Box::new(sig_arr));
688    let pk = MlDsaPublicKey::from_bytes(&packet.group_pk)
689        .map_err(|_| anyhow::anyhow!("invalid group_pk"))?;
690    let ml = MlDsa65::new();
691    let msg = group_identity_canonical_sign_bytes(&packet.id, &packet.membership_root);
692    let ok = ml
693        .verify(&pk, &msg, &sig)
694        .map_err(|e| anyhow::anyhow!("verify failed: {e:?}"))?;
695    if !ok {
696        anyhow::bail!("group signature invalid");
697    }
698    dht_put_bytes(&packet.id, serde_json::to_vec(&packet)?).await
699}
700
701/// Fetch a group identity by id key
702pub async fn group_identity_fetch(id_key: Key) -> Result<GroupIdentityPacketV1> {
703    let data = dht_get_bytes(&id_key).await.context("Group not found")?;
704    let pkt: GroupIdentityPacketV1 = serde_json::from_slice(&data)?;
705    Ok(pkt)
706}
707
708/// Update group members with signature verification over canonical bytes
709pub async fn group_identity_update_members_signed(
710    id_key: Key,
711    new_members: Vec<MemberRef>,
712    group_pk: Vec<u8>,
713    group_sig: Sig,
714) -> Result<()> {
715    // Compute new root and verify signature
716    let new_root = compute_membership_root(&new_members);
717    use crate::quantum_crypto::{MlDsa65, MlDsaOperations, MlDsaPublicKey, MlDsaSignature};
718    const SIG_LEN: usize = 3309;
719    let sig_bytes = group_sig.as_bytes();
720    if sig_bytes.len() != SIG_LEN {
721        anyhow::bail!("invalid signature length");
722    }
723    let mut sig_arr = [0u8; SIG_LEN];
724    sig_arr.copy_from_slice(sig_bytes);
725    let sig = MlDsaSignature(Box::new(sig_arr));
726    let pk =
727        MlDsaPublicKey::from_bytes(&group_pk).map_err(|_| anyhow::anyhow!("invalid group_pk"))?;
728    let ml = MlDsa65::new();
729    let msg = group_identity_canonical_sign_bytes(&id_key, &new_root);
730    let ok = ml
731        .verify(&pk, &msg, &sig)
732        .map_err(|e| anyhow::anyhow!("verify failed: {e:?}"))?;
733    if !ok {
734        anyhow::bail!("group signature invalid");
735    }
736
737    // Fetch current (if exists) to preserve metadata
738    let mut pkt = match group_identity_fetch(id_key.clone()).await {
739        Ok(p) => p,
740        Err(_) => GroupIdentityPacketV1 {
741            v: 1,
742            words: [String::new(), String::new(), String::new(), String::new()],
743            id: id_key.clone(),
744            group_pk: group_pk.clone(),
745            group_sig: sig.0.clone().to_vec(),
746            members: Vec::new(),
747            membership_root: new_root.clone(),
748            created_at: std::time::SystemTime::now()
749                .duration_since(std::time::UNIX_EPOCH)
750                .unwrap_or_default()
751                .as_secs(),
752            mls_ciphersuite: None,
753        },
754    };
755
756    pkt.members = new_members;
757    pkt.membership_root = new_root;
758    pkt.group_pk = group_pk;
759    pkt.group_sig = sig.0.to_vec();
760
761    group_identity_publish(pkt).await
762}
763
764/// Store data with full replication
765async fn store_replicated(
766    handle: &IdentityHandle,
767    data: Vec<u8>,
768    replicas: usize,
769) -> Result<StorageHandle> {
770    let storage_id = Key::from(*blake3::hash(&data).as_bytes());
771
772    // Get presence BEFORE acquiring DHT write lock to avoid deadlock
773    let presence = get_presence(handle.key()).await?;
774
775    // Build shard map before acquiring lock
776    let mut shard_map = crate::types::storage::ShardMap::new();
777    for (i, device) in presence.devices.iter().take(replicas).enumerate() {
778        shard_map.assign_shard(device.id, i as u32);
779    }
780
781    // Store in DHT
782    let mut dht = DHT.write().await;
783    dht.put(storage_id.clone(), data.clone()).await?;
784    drop(dht); // Release lock early
785
786    Ok(StorageHandle {
787        id: storage_id,
788        size: data.len() as u64,
789        strategy: StorageStrategy::FullReplication { replicas },
790        shard_map,
791        sealed_key: Some(vec![0u8; 32]),
792    })
793}