Skip to main content

astrid_capabilities/
store.rs

1//! Capability token storage.
2//!
3//! Provides both in-memory (session) and persistent (`SurrealKV`) storage
4//! for capability tokens.
5
6use astrid_core::{Permission, TokenId};
7use astrid_storage::{KvStore, SurrealKvStore};
8use std::collections::HashMap;
9use std::path::Path;
10use std::sync::{Arc, RwLock};
11
12use crate::error::{CapabilityError, CapabilityResult};
13use crate::token::CapabilityToken;
14
15// -- Namespace constants --
16
17const NS_TOKENS: &str = "caps:tokens";
18const NS_REVOKED: &str = "caps:revoked";
19const NS_USED: &str = "caps:used";
20
21/// Tombstone value for presence-only KV entries (revoked/used markers).
22const PRESENCE_MARKER: &[u8] = &[1];
23
24/// Run an async future synchronously.
25///
26/// Handles three cases:
27/// - Inside an async context: uses a scoped thread to avoid the
28///   "cannot `block_on` from within a runtime" panic.
29/// - Outside a runtime: creates a temporary runtime.
30fn block_on<F>(f: F) -> F::Output
31where
32    F: std::future::Future + Send,
33    F::Output: Send,
34{
35    match tokio::runtime::Handle::try_current() {
36        Ok(handle) => std::thread::scope(|s| {
37            s.spawn(|| handle.block_on(f))
38                .join()
39                .expect("async thread panicked")
40        }),
41        Err(_) => tokio::runtime::Builder::new_current_thread()
42            .enable_all()
43            .build()
44            .expect("failed to create tokio runtime")
45            .block_on(f),
46    }
47}
48
49/// Capability store with both session and persistent storage.
50pub struct CapabilityStore {
51    /// Session tokens (in-memory, cleared on session end).
52    session_tokens: RwLock<HashMap<TokenId, CapabilityToken>>,
53    /// Persistent tokens (`KvStore` backed).
54    persistent_store: Option<Arc<dyn KvStore>>,
55    /// Revoked token IDs (quick lookup).
56    revoked: RwLock<std::collections::HashSet<TokenId>>,
57    /// Used single-use token IDs (replay protection).
58    used_tokens: RwLock<std::collections::HashSet<TokenId>>,
59}
60
61impl CapabilityStore {
62    /// Create an in-memory only store (no persistence).
63    #[must_use]
64    pub fn in_memory() -> Self {
65        Self {
66            session_tokens: RwLock::new(HashMap::new()),
67            persistent_store: None,
68            revoked: RwLock::new(std::collections::HashSet::new()),
69            used_tokens: RwLock::new(std::collections::HashSet::new()),
70        }
71    }
72
73    /// Create a store with persistence.
74    ///
75    /// # Errors
76    ///
77    /// Returns an error if the database cannot be opened or read.
78    pub fn with_persistence(path: impl AsRef<Path>) -> CapabilityResult<Self> {
79        let store =
80            SurrealKvStore::open(path).map_err(|e| CapabilityError::StorageError(e.to_string()))?;
81        let kv: Arc<dyn KvStore> = Arc::new(store);
82
83        let mut cap_store = Self {
84            session_tokens: RwLock::new(HashMap::new()),
85            persistent_store: Some(kv),
86            revoked: RwLock::new(std::collections::HashSet::new()),
87            used_tokens: RwLock::new(std::collections::HashSet::new()),
88        };
89
90        // Load revoked and used tokens
91        cap_store.load_revoked()?;
92        cap_store.load_used_tokens()?;
93
94        Ok(cap_store)
95    }
96
97    /// Create a store backed by an existing `KvStore` (for shared stores).
98    ///
99    /// # Errors
100    ///
101    /// Returns an error if loading existing revoked/used tokens fails.
102    pub fn with_kv_store(store: Arc<dyn KvStore>) -> CapabilityResult<Self> {
103        let mut cap_store = Self {
104            session_tokens: RwLock::new(HashMap::new()),
105            persistent_store: Some(store),
106            revoked: RwLock::new(std::collections::HashSet::new()),
107            used_tokens: RwLock::new(std::collections::HashSet::new()),
108        };
109
110        cap_store.load_revoked()?;
111        cap_store.load_used_tokens()?;
112
113        Ok(cap_store)
114    }
115
116    /// Load revoked token IDs from persistent storage.
117    fn load_revoked(&mut self) -> CapabilityResult<()> {
118        let Some(store) = &self.persistent_store else {
119            return Ok(());
120        };
121
122        let keys = block_on(store.list_keys(NS_REVOKED))
123            .map_err(|e| CapabilityError::StorageError(e.to_string()))?;
124
125        let mut revoked = self
126            .revoked
127            .write()
128            .map_err(|e| CapabilityError::StorageError(e.to_string()))?;
129
130        for key in keys {
131            if let Ok(uuid) = uuid::Uuid::parse_str(&key) {
132                revoked.insert(TokenId::from_uuid(uuid));
133            }
134        }
135
136        Ok(())
137    }
138
139    /// Load used single-use token IDs from persistent storage.
140    fn load_used_tokens(&mut self) -> CapabilityResult<()> {
141        let Some(store) = &self.persistent_store else {
142            return Ok(());
143        };
144
145        let keys = block_on(store.list_keys(NS_USED))
146            .map_err(|e| CapabilityError::StorageError(e.to_string()))?;
147
148        let mut used = self
149            .used_tokens
150            .write()
151            .map_err(|e| CapabilityError::StorageError(e.to_string()))?;
152
153        for key in keys {
154            if let Ok(uuid) = uuid::Uuid::parse_str(&key) {
155                used.insert(TokenId::from_uuid(uuid));
156            }
157        }
158
159        Ok(())
160    }
161
162    /// Add a capability token.
163    ///
164    /// # Errors
165    ///
166    /// Returns an error if the token is invalid or storage fails.
167    pub fn add(&self, token: CapabilityToken) -> CapabilityResult<()> {
168        // Validate the token first
169        token.validate()?;
170
171        match token.scope {
172            crate::token::TokenScope::Session => {
173                let mut tokens = self
174                    .session_tokens
175                    .write()
176                    .map_err(|e| CapabilityError::StorageError(e.to_string()))?;
177                tokens.insert(token.id.clone(), token);
178            },
179            crate::token::TokenScope::Persistent => {
180                if let Some(store) = &self.persistent_store {
181                    let serialized = serde_json::to_vec(&token)
182                        .map_err(|e| CapabilityError::SerializationError(e.to_string()))?;
183
184                    let key = token.id.0.to_string();
185                    block_on(store.set(NS_TOKENS, &key, serialized))
186                        .map_err(|e| CapabilityError::StorageError(e.to_string()))?;
187                } else {
188                    // Fall back to session storage if no persistence
189                    let mut tokens = self
190                        .session_tokens
191                        .write()
192                        .map_err(|e| CapabilityError::StorageError(e.to_string()))?;
193                    tokens.insert(token.id.clone(), token);
194                }
195            },
196        }
197
198        Ok(())
199    }
200
201    /// Get a token by ID.
202    ///
203    /// # Errors
204    ///
205    /// Returns [`CapabilityError::TokenRevoked`] if the token has been revoked,
206    /// or a storage error if reading fails.
207    pub fn get(&self, token_id: &TokenId) -> CapabilityResult<Option<CapabilityToken>> {
208        // Check if revoked
209        {
210            let revoked = self
211                .revoked
212                .read()
213                .map_err(|e| CapabilityError::StorageError(e.to_string()))?;
214            if revoked.contains(token_id) {
215                return Err(CapabilityError::TokenRevoked {
216                    token_id: token_id.to_string(),
217                });
218            }
219        }
220
221        // Check session tokens first
222        {
223            let tokens = self
224                .session_tokens
225                .read()
226                .map_err(|e| CapabilityError::StorageError(e.to_string()))?;
227            if let Some(token) = tokens.get(token_id) {
228                return Ok(Some(token.clone()));
229            }
230        }
231
232        // Check persistent storage
233        if let Some(store) = &self.persistent_store {
234            let key = token_id.0.to_string();
235            let data = block_on(store.get(NS_TOKENS, &key))
236                .map_err(|e| CapabilityError::StorageError(e.to_string()))?;
237
238            if let Some(bytes) = data {
239                let token: CapabilityToken = serde_json::from_slice(&bytes)
240                    .map_err(|e| CapabilityError::SerializationError(e.to_string()))?;
241                // Defense in depth: re-verify persistent tokens on read.
242                // Session tokens were validated at add() time and live in
243                // trusted memory, but persistent tokens could be tampered on
244                // disk.
245                token.validate()?;
246                return Ok(Some(token));
247            }
248        }
249
250        Ok(None)
251    }
252
253    /// Check if a single-use token has already been consumed.
254    ///
255    /// Returns `Ok(true)` if the token is single-use and already consumed.
256    /// Returns `Ok(false)` if the token is not single-use or has not been used.
257    /// Returns `Err(())` on lock poisoning, to support fail-closed callers.
258    fn is_consumed_single_use(&self, token: &CapabilityToken) -> Result<bool, ()> {
259        if !token.is_single_use() {
260            return Ok(false);
261        }
262        let used = self.used_tokens.read().map_err(|_| ())?;
263        Ok(used.contains(&token.id))
264    }
265
266    /// Check if there's a capability for a resource and permission.
267    pub fn has_capability(&self, resource: &str, permission: Permission) -> bool {
268        // Check session tokens
269        if let Ok(tokens) = self.session_tokens.read() {
270            for token in tokens.values() {
271                if !token.is_expired() && token.grants(resource, permission) {
272                    match self.is_consumed_single_use(token) {
273                        Ok(true) => {},
274                        Ok(false) => return true,
275                        Err(()) => return false,
276                    }
277                }
278            }
279        }
280
281        // Check persistent tokens
282        if let Some(store) = &self.persistent_store
283            && let Ok(keys) = block_on(store.list_keys(NS_TOKENS))
284        {
285            for key in keys {
286                if let Ok(Some(data)) = block_on(store.get(NS_TOKENS, &key))
287                    && let Ok(token) = serde_json::from_slice::<CapabilityToken>(&data)
288                {
289                    // Defense in depth: validate persistent tokens (expiry +
290                    // signature). Uses validate() for consistency with get()
291                    // so future checks (e.g. nbf) are applied uniformly.
292                    if let Err(e) = token.validate() {
293                        if matches!(e, CapabilityError::TokenExpired { .. }) {
294                            tracing::debug!(token_id = %token.id, "skipping expired persistent token");
295                        } else {
296                            tracing::warn!(token_id = %token.id, "skipping invalid persistent token: {e}");
297                        }
298                        continue;
299                    }
300                    // Check if not revoked
301                    if let Ok(revoked) = self.revoked.read()
302                        && revoked.contains(&token.id)
303                    {
304                        continue;
305                    }
306                    if token.grants(resource, permission) {
307                        match self.is_consumed_single_use(&token) {
308                            Ok(true) => {},
309                            Ok(false) => return true,
310                            Err(()) => return false,
311                        }
312                    }
313                }
314            }
315        }
316
317        false
318    }
319
320    /// Find a token that grants a capability.
321    pub fn find_capability(
322        &self,
323        resource: &str,
324        permission: Permission,
325    ) -> Option<CapabilityToken> {
326        // Check session tokens
327        if let Ok(tokens) = self.session_tokens.read() {
328            for token in tokens.values() {
329                if !token.is_expired() && token.grants(resource, permission) {
330                    match self.is_consumed_single_use(token) {
331                        Ok(true) => {},
332                        Ok(false) => return Some(token.clone()),
333                        Err(()) => return None,
334                    }
335                }
336            }
337        }
338
339        // Check persistent tokens
340        if let Some(store) = &self.persistent_store
341            && let Ok(keys) = block_on(store.list_keys(NS_TOKENS))
342        {
343            for key in keys {
344                if let Ok(Some(data)) = block_on(store.get(NS_TOKENS, &key))
345                    && let Ok(token) = serde_json::from_slice::<CapabilityToken>(&data)
346                {
347                    // Defense in depth: validate persistent tokens (expiry +
348                    // signature). Uses validate() for consistency with get().
349                    if let Err(e) = token.validate() {
350                        if matches!(e, CapabilityError::TokenExpired { .. }) {
351                            tracing::debug!(token_id = %token.id, "skipping expired persistent token");
352                        } else {
353                            tracing::warn!(token_id = %token.id, "skipping invalid persistent token: {e}");
354                        }
355                        continue;
356                    }
357                    // Check if not revoked
358                    if let Ok(revoked) = self.revoked.read()
359                        && revoked.contains(&token.id)
360                    {
361                        continue;
362                    }
363                    if token.grants(resource, permission) {
364                        match self.is_consumed_single_use(&token) {
365                            Ok(true) => {},
366                            Ok(false) => return Some(token),
367                            Err(()) => return None,
368                        }
369                    }
370                }
371            }
372        }
373
374        None
375    }
376
377    /// Revoke a token.
378    ///
379    /// # Errors
380    ///
381    /// Returns an error if storage operations fail.
382    pub fn revoke(&self, token_id: &TokenId) -> CapabilityResult<()> {
383        // Persist revocation first so KV is the ground truth. If the daemon
384        // crashes after this point, `load_revoked()` will still see it on
385        // restart.
386        if let Some(store) = &self.persistent_store {
387            let key = token_id.0.to_string();
388
389            block_on(store.set(NS_REVOKED, &key, PRESENCE_MARKER.to_vec()))
390                .map_err(|e| CapabilityError::StorageError(e.to_string()))?;
391
392            if let Err(e) = block_on(store.delete(NS_TOKENS, &key)) {
393                tracing::warn!("failed to delete revoked token from caps:tokens: {e}");
394            }
395        }
396
397        // Update in-memory state (rebuilt from KV on restart regardless).
398        {
399            let mut revoked = self
400                .revoked
401                .write()
402                .map_err(|e| CapabilityError::StorageError(e.to_string()))?;
403            revoked.insert(token_id.clone());
404        }
405
406        {
407            let mut tokens = self
408                .session_tokens
409                .write()
410                .map_err(|e| CapabilityError::StorageError(e.to_string()))?;
411            tokens.remove(token_id);
412        }
413
414        Ok(())
415    }
416
417    /// Clear all session tokens.
418    ///
419    /// # Errors
420    ///
421    /// Returns an error if the lock cannot be acquired.
422    pub fn clear_session(&self) -> CapabilityResult<()> {
423        let mut tokens = self
424            .session_tokens
425            .write()
426            .map_err(|e| CapabilityError::StorageError(e.to_string()))?;
427        tokens.clear();
428        Ok(())
429    }
430
431    /// Mark a single-use token as used.
432    ///
433    /// This should be called after successfully using a single-use token
434    /// to prevent replay attacks.
435    ///
436    /// # Errors
437    ///
438    /// Returns an error if the token was already used or storage fails.
439    pub fn mark_used(&self, token_id: &TokenId) -> CapabilityResult<()> {
440        // Hold a single write lock across check, persist, and insert to
441        // prevent TOCTOU races where two concurrent callers both pass
442        // the "already used?" check before either inserts.
443        let mut used = self
444            .used_tokens
445            .write()
446            .map_err(|e| CapabilityError::StorageError(e.to_string()))?;
447
448        if used.contains(token_id) {
449            return Err(CapabilityError::TokenAlreadyUsed {
450                token_id: token_id.to_string(),
451            });
452        }
453
454        // Persist first so KV is the ground truth. If the daemon crashes
455        // after this point, `load_used_tokens()` will still see it on
456        // restart. Holding the write lock across `block_on` is safe
457        // because `block_on` spawns an OS thread and does not re-acquire
458        // any lock on this store.
459        if let Some(store) = &self.persistent_store {
460            block_on(store.set(NS_USED, &token_id.0.to_string(), PRESENCE_MARKER.to_vec()))
461                .map_err(|e| CapabilityError::StorageError(e.to_string()))?;
462        }
463
464        used.insert(token_id.clone());
465        Ok(())
466    }
467
468    /// Check if a single-use token has been used.
469    pub fn is_used(&self, token_id: &TokenId) -> bool {
470        self.used_tokens
471            .read()
472            .map(|used| used.contains(token_id))
473            .unwrap_or(false)
474    }
475
476    /// Validate and optionally consume a token.
477    ///
478    /// For single-use tokens, this marks them as used.
479    /// For regular tokens, this just validates them.
480    ///
481    /// # Errors
482    ///
483    /// Returns an error if the token is invalid, expired, revoked, or already used.
484    pub fn use_token(&self, token_id: &TokenId) -> CapabilityResult<CapabilityToken> {
485        let token = self
486            .get(token_id)?
487            .ok_or_else(|| CapabilityError::TokenNotFound {
488                token_id: token_id.to_string(),
489            })?;
490
491        // Validate the token
492        token.validate()?;
493
494        // For single-use tokens, mark as used
495        if token.is_single_use() {
496            self.mark_used(token_id)?;
497        }
498
499        Ok(token)
500    }
501
502    /// List all valid tokens.
503    ///
504    /// # Errors
505    ///
506    /// Returns an error if storage operations fail.
507    pub fn list_tokens(&self) -> CapabilityResult<Vec<CapabilityToken>> {
508        let mut tokens = Vec::new();
509
510        // Session tokens
511        {
512            let session = self
513                .session_tokens
514                .read()
515                .map_err(|e| CapabilityError::StorageError(e.to_string()))?;
516            for token in session.values() {
517                if !token.is_expired() {
518                    tokens.push(token.clone());
519                }
520            }
521        }
522
523        // Persistent tokens
524        if let Some(store) = &self.persistent_store {
525            let revoked = self
526                .revoked
527                .read()
528                .map_err(|e| CapabilityError::StorageError(e.to_string()))?;
529
530            let keys = block_on(store.list_keys(NS_TOKENS))
531                .map_err(|e| CapabilityError::StorageError(e.to_string()))?;
532
533            for key in keys {
534                let data = block_on(store.get(NS_TOKENS, &key))
535                    .map_err(|e| CapabilityError::StorageError(e.to_string()))?;
536                if let Some(bytes) = data
537                    && let Ok(token) = serde_json::from_slice::<CapabilityToken>(&bytes)
538                    && !revoked.contains(&token.id)
539                    && !token.is_expired()
540                {
541                    tokens.push(token);
542                }
543            }
544        }
545
546        Ok(tokens)
547    }
548
549    /// Cleanup expired tokens from persistent storage.
550    ///
551    /// # Errors
552    ///
553    /// Returns an error if storage operations fail.
554    pub fn cleanup_expired(&self) -> CapabilityResult<usize> {
555        let mut removed: usize = 0;
556
557        if let Some(store) = &self.persistent_store {
558            let keys = block_on(store.list_keys(NS_TOKENS))
559                .map_err(|e| CapabilityError::StorageError(e.to_string()))?;
560
561            for key in keys {
562                let data = block_on(store.get(NS_TOKENS, &key))
563                    .map_err(|e| CapabilityError::StorageError(e.to_string()))?;
564                if let Some(bytes) = data
565                    && let Ok(token) = serde_json::from_slice::<CapabilityToken>(&bytes)
566                    && token.is_expired()
567                {
568                    let _ = block_on(store.delete(NS_TOKENS, &key));
569                    removed = removed.saturating_add(1);
570                }
571            }
572        }
573
574        Ok(removed)
575    }
576}
577
578impl Default for CapabilityStore {
579    fn default() -> Self {
580        Self::in_memory()
581    }
582}
583
584impl std::fmt::Debug for CapabilityStore {
585    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
586        let session_count = self.session_tokens.read().map(|t| t.len()).unwrap_or(0);
587        let revoked_count = self.revoked.read().map(|r| r.len()).unwrap_or(0);
588        let used_count = self.used_tokens.read().map(|u| u.len()).unwrap_or(0);
589        let has_persistence = self.persistent_store.is_some();
590
591        f.debug_struct("CapabilityStore")
592            .field("session_tokens", &session_count)
593            .field("revoked_count", &revoked_count)
594            .field("used_count", &used_count)
595            .field("has_persistence", &has_persistence)
596            .finish()
597    }
598}
599
600#[cfg(test)]
601mod tests {
602    use super::*;
603    use crate::pattern::ResourcePattern;
604    use crate::token::{AuditEntryId, TokenScope};
605    use astrid_crypto::KeyPair;
606    use astrid_storage::MemoryKvStore;
607
608    fn test_keypair() -> KeyPair {
609        KeyPair::generate()
610    }
611
612    #[tokio::test]
613    async fn test_in_memory_store() {
614        let store = CapabilityStore::in_memory();
615        let keypair = test_keypair();
616
617        let token = CapabilityToken::create(
618            ResourcePattern::exact("mcp://test:tool").unwrap(),
619            vec![Permission::Invoke],
620            TokenScope::Session,
621            keypair.key_id(),
622            AuditEntryId::new(),
623            &keypair,
624            None,
625        );
626
627        let token_id = token.id.clone();
628
629        store.add(token).unwrap();
630        assert!(store.has_capability("mcp://test:tool", Permission::Invoke));
631        assert!(store.get(&token_id).unwrap().is_some());
632    }
633
634    #[tokio::test]
635    async fn test_revoke() {
636        let store = CapabilityStore::in_memory();
637        let keypair = test_keypair();
638
639        let token = CapabilityToken::create(
640            ResourcePattern::exact("mcp://test:tool").unwrap(),
641            vec![Permission::Invoke],
642            TokenScope::Session,
643            keypair.key_id(),
644            AuditEntryId::new(),
645            &keypair,
646            None,
647        );
648
649        let token_id = token.id.clone();
650
651        store.add(token).unwrap();
652        assert!(store.has_capability("mcp://test:tool", Permission::Invoke));
653
654        store.revoke(&token_id).unwrap();
655        assert!(!store.has_capability("mcp://test:tool", Permission::Invoke));
656        assert!(matches!(
657            store.get(&token_id),
658            Err(CapabilityError::TokenRevoked { .. })
659        ));
660    }
661
662    #[tokio::test]
663    async fn test_clear_session() {
664        let store = CapabilityStore::in_memory();
665        let keypair = test_keypair();
666
667        let token = CapabilityToken::create(
668            ResourcePattern::exact("mcp://test:tool").unwrap(),
669            vec![Permission::Invoke],
670            TokenScope::Session,
671            keypair.key_id(),
672            AuditEntryId::new(),
673            &keypair,
674            None,
675        );
676
677        store.add(token).unwrap();
678        assert!(store.has_capability("mcp://test:tool", Permission::Invoke));
679
680        store.clear_session().unwrap();
681        assert!(!store.has_capability("mcp://test:tool", Permission::Invoke));
682    }
683
684    #[tokio::test]
685    async fn test_find_capability() {
686        let store = CapabilityStore::in_memory();
687        let keypair = test_keypair();
688
689        let token = CapabilityToken::create(
690            ResourcePattern::new("mcp://filesystem:*").unwrap(),
691            vec![Permission::Invoke],
692            TokenScope::Session,
693            keypair.key_id(),
694            AuditEntryId::new(),
695            &keypair,
696            None,
697        );
698
699        store.add(token).unwrap();
700
701        let found = store.find_capability("mcp://filesystem:read_file", Permission::Invoke);
702        assert!(found.is_some());
703
704        let not_found = store.find_capability("mcp://memory:read", Permission::Invoke);
705        assert!(not_found.is_none());
706    }
707
708    #[tokio::test(flavor = "multi_thread")]
709    async fn test_persistent_store() {
710        // Use an in-memory KvStore for testing (avoids filesystem issues).
711        let kv: Arc<dyn KvStore> = Arc::new(MemoryKvStore::new());
712        let store = CapabilityStore::with_kv_store(Arc::clone(&kv)).unwrap();
713        let keypair = test_keypair();
714
715        let token = CapabilityToken::create(
716            ResourcePattern::exact("mcp://test:tool").unwrap(),
717            vec![Permission::Invoke],
718            TokenScope::Persistent,
719            keypair.key_id(),
720            AuditEntryId::new(),
721            &keypair,
722            None,
723        );
724
725        let token_id = token.id.clone();
726
727        store.add(token).unwrap();
728
729        // Reload store to verify persistence (same backing store).
730        drop(store);
731        let store2 = CapabilityStore::with_kv_store(kv).unwrap();
732        assert!(store2.get(&token_id).unwrap().is_some());
733        // Verify find_capability (the production lookup path) also works after reload.
734        assert!(
735            store2
736                .find_capability("mcp://test:tool", Permission::Invoke)
737                .is_some()
738        );
739
740        // Also test disk-backed store can open and store/retrieve.
741        // Note: SurrealKV holds an OS-level file lock, so we cannot drop-and-reopen
742        // the same path in a single test. The in-memory `with_kv_store` test above
743        // already validates the reload-from-backing-store pattern.
744        let temp_dir = tempfile::tempdir().unwrap();
745        let disk_store = CapabilityStore::with_persistence(temp_dir.path().join("caps")).unwrap();
746        let token2 = CapabilityToken::create(
747            ResourcePattern::exact("mcp://test:tool2").unwrap(),
748            vec![Permission::Invoke],
749            TokenScope::Persistent,
750            keypair.key_id(),
751            AuditEntryId::new(),
752            &keypair,
753            None,
754        );
755        let other_token_id = token2.id.clone();
756        disk_store.add(token2).unwrap();
757        assert!(disk_store.get(&other_token_id).unwrap().is_some());
758    }
759
760    #[tokio::test(flavor = "multi_thread")]
761    async fn test_revocation_survives_restart() {
762        let kv: Arc<dyn KvStore> = Arc::new(MemoryKvStore::new());
763        let store = CapabilityStore::with_kv_store(Arc::clone(&kv)).unwrap();
764        let keypair = test_keypair();
765
766        let token = CapabilityToken::create(
767            ResourcePattern::exact("mcp://test:tool").unwrap(),
768            vec![Permission::Invoke],
769            TokenScope::Persistent,
770            keypair.key_id(),
771            AuditEntryId::new(),
772            &keypair,
773            None,
774        );
775
776        let token_id = token.id.clone();
777        store.add(token).unwrap();
778        store.revoke(&token_id).unwrap();
779
780        // Reload - revocation must survive.
781        drop(store);
782        let store2 = CapabilityStore::with_kv_store(kv).unwrap();
783        assert!(matches!(
784            store2.get(&token_id),
785            Err(CapabilityError::TokenRevoked { .. })
786        ));
787        assert!(!store2.has_capability("mcp://test:tool", Permission::Invoke));
788    }
789
790    #[tokio::test(flavor = "multi_thread")]
791    async fn test_mark_used_survives_restart() {
792        let kv: Arc<dyn KvStore> = Arc::new(MemoryKvStore::new());
793        let store = CapabilityStore::with_kv_store(Arc::clone(&kv)).unwrap();
794        let keypair = test_keypair();
795
796        let token = CapabilityToken::create_with_options(
797            ResourcePattern::exact("mcp://test:tool").unwrap(),
798            vec![Permission::Invoke],
799            TokenScope::Persistent,
800            keypair.key_id(),
801            AuditEntryId::new(),
802            &keypair,
803            None,
804            true,
805        );
806
807        let token_id = token.id.clone();
808        store.add(token).unwrap();
809        store.mark_used(&token_id).unwrap();
810
811        // Reload - used state must survive.
812        drop(store);
813        let store2 = CapabilityStore::with_kv_store(kv).unwrap();
814        assert!(store2.is_used(&token_id));
815        assert!(!store2.has_capability("mcp://test:tool", Permission::Invoke));
816    }
817
818    #[tokio::test]
819    async fn test_find_capability_excludes_used_single_use() {
820        let store = CapabilityStore::in_memory();
821        let keypair = test_keypair();
822
823        let token = CapabilityToken::create_with_options(
824            ResourcePattern::exact("mcp://test:tool").unwrap(),
825            vec![Permission::Invoke],
826            TokenScope::Session,
827            keypair.key_id(),
828            AuditEntryId::new(),
829            &keypair,
830            None,
831            true,
832        );
833
834        let token_id = token.id.clone();
835        store.add(token).unwrap();
836
837        // Before marking used: both find_capability and has_capability return the token
838        assert!(
839            store
840                .find_capability("mcp://test:tool", Permission::Invoke)
841                .is_some()
842        );
843        assert!(store.has_capability("mcp://test:tool", Permission::Invoke));
844
845        // Mark the single-use token as consumed
846        store.mark_used(&token_id).unwrap();
847
848        // After marking used: both must exclude the consumed token
849        assert!(
850            store
851                .find_capability("mcp://test:tool", Permission::Invoke)
852                .is_none()
853        );
854        assert!(!store.has_capability("mcp://test:tool", Permission::Invoke));
855    }
856
857    /// Helper: create a valid persistent token, serialize it, tamper a field,
858    /// and write the corrupted bytes directly to the KV store (bypassing
859    /// `CapabilityStore::add` which validates). Returns the token ID.
860    async fn inject_tampered_persistent_token(kv: &Arc<dyn KvStore>, keypair: &KeyPair) -> TokenId {
861        let token = CapabilityToken::create(
862            ResourcePattern::exact("mcp://tampered:tool").unwrap(),
863            vec![Permission::Invoke],
864            TokenScope::Persistent,
865            keypair.key_id(),
866            AuditEntryId::new(),
867            keypair,
868            None,
869        );
870        let token_id = token.id.clone();
871
872        // Serialize, tamper a field (add an extra permission), re-serialize.
873        // The signature was computed over the original data, so it will
874        // no longer verify after tampering.
875        let mut value: serde_json::Value = serde_json::to_value(&token).unwrap();
876        value["permissions"] = serde_json::json!(["invoke", "read", "write"]);
877        let tampered_bytes = serde_json::to_vec(&value).unwrap();
878
879        kv.set(NS_TOKENS, &token_id.0.to_string(), tampered_bytes)
880            .await
881            .unwrap();
882        token_id
883    }
884
885    #[tokio::test(flavor = "multi_thread")]
886    async fn test_get_rejects_tampered_persistent_token() {
887        let kv: Arc<dyn KvStore> = Arc::new(MemoryKvStore::new());
888        let store = CapabilityStore::with_kv_store(Arc::clone(&kv)).unwrap();
889        let keypair = test_keypair();
890
891        let token_id = inject_tampered_persistent_token(&kv, &keypair).await;
892
893        // get() should return an error for tampered tokens
894        let result = store.get(&token_id);
895        assert!(
896            matches!(result, Err(CapabilityError::InvalidSignature)),
897            "expected InvalidSignature, got {result:?}"
898        );
899    }
900
901    #[tokio::test(flavor = "multi_thread")]
902    async fn test_find_capability_skips_tampered_persistent_token() {
903        let kv: Arc<dyn KvStore> = Arc::new(MemoryKvStore::new());
904        let store = CapabilityStore::with_kv_store(Arc::clone(&kv)).unwrap();
905        let keypair = test_keypair();
906
907        let _token_id = inject_tampered_persistent_token(&kv, &keypair).await;
908
909        // find_capability should skip tampered tokens and return None
910        assert!(
911            store
912                .find_capability("mcp://tampered:tool", Permission::Invoke)
913                .is_none()
914        );
915    }
916
917    #[tokio::test(flavor = "multi_thread")]
918    async fn test_has_capability_skips_tampered_persistent_token() {
919        let kv: Arc<dyn KvStore> = Arc::new(MemoryKvStore::new());
920        let store = CapabilityStore::with_kv_store(Arc::clone(&kv)).unwrap();
921        let keypair = test_keypair();
922
923        let _token_id = inject_tampered_persistent_token(&kv, &keypair).await;
924
925        // has_capability should skip tampered tokens and return false
926        assert!(!store.has_capability("mcp://tampered:tool", Permission::Invoke));
927    }
928
929    #[tokio::test(flavor = "multi_thread")]
930    async fn test_find_capability_excludes_used_single_use_persistent() {
931        let kv: Arc<dyn KvStore> = Arc::new(MemoryKvStore::new());
932        let store = CapabilityStore::with_kv_store(Arc::clone(&kv)).unwrap();
933        let keypair = test_keypair();
934
935        let token = CapabilityToken::create_with_options(
936            ResourcePattern::exact("mcp://test:tool").unwrap(),
937            vec![Permission::Invoke],
938            TokenScope::Persistent,
939            keypair.key_id(),
940            AuditEntryId::new(),
941            &keypair,
942            None,
943            true,
944        );
945
946        let token_id = token.id.clone();
947        store.add(token).unwrap();
948
949        assert!(
950            store
951                .find_capability("mcp://test:tool", Permission::Invoke)
952                .is_some()
953        );
954        assert!(store.has_capability("mcp://test:tool", Permission::Invoke));
955
956        store.mark_used(&token_id).unwrap();
957
958        assert!(
959            store
960                .find_capability("mcp://test:tool", Permission::Invoke)
961                .is_none()
962        );
963        assert!(!store.has_capability("mcp://test:tool", Permission::Invoke));
964    }
965}