Skip to main content

anda_core/
context.rs

1//! Execution context traits for agents and tools.
2//!
3//! This module defines the capability traits that an Anda runtime exposes to
4//! agents and tools. Context implementations provide identity, cancellation,
5//! cryptographic keys, isolated storage, caching, HTTP calls, and canister
6//! access without requiring each agent or tool to know how those services are
7//! implemented.
8//!
9//! The traits are split by capability so custom runtimes can implement only one
10//! coherent execution surface while still keeping the public API explicit:
11//!
12//! - [`BaseContext`] combines the capabilities available to agents and tools.
13//! - [`AgentContext`] extends [`BaseContext`] with completion and orchestration
14//!   features used by agents.
15//! - [`StateFeatures`], [`KeysFeatures`], [`StoreFeatures`], [`CacheFeatures`],
16//!   and [`HttpFeatures`] describe individual groups of runtime services.
17//! - [`CacheStoreFeatures`] provides convenience methods for values that should
18//!   be cached in memory and persisted to object storage.
19//!
20//! The `anda_engine` `context` module provides the default runtime
21//! implementation. Other runtimes can implement these traits for specialized
22//! environments such as tests, embedded workers, or alternative TEE backends.
23
24use async_trait::async_trait;
25use bytes::Bytes;
26use ciborium::from_reader;
27use ic_auth_types::deterministic_cbor_into_vec;
28use serde::{Deserialize, Serialize, de::DeserializeOwned};
29use std::{future::Future, sync::Arc, time::Duration};
30
31pub use anda_db_schema::Json;
32pub use candid::Principal;
33pub use ic_cose_types::CanisterCaller;
34pub use ic_oss_types::object_store::UpdateVersion;
35pub use object_store::{ObjectMeta, PutMode, PutResult, UpdateVersion as OsVersion, path::Path};
36pub use tokio_util::sync::CancellationToken;
37
38use crate::BoxError;
39use crate::model::*;
40
41/// Execution environment available to agents.
42///
43/// `AgentContext` combines the base runtime capabilities with model completion
44/// and orchestration methods for calling local or remote agents and tools.
45pub trait AgentContext: BaseContext + CompletionFeatures {
46    /// Returns definitions for available local tools.
47    ///
48    /// # Arguments
49    /// * `names` - Optional filter for specific tool names.
50    ///
51    /// # Returns
52    /// Vector of function definitions for the requested tools.
53    fn tool_definitions(&self, names: Option<&[String]>) -> Vec<FunctionDefinition>;
54
55    /// Returns definitions for tools exposed by remote engines.
56    ///
57    /// # Arguments
58    /// * `endpoint` - Optional filter for specific remote engine endpoint;
59    /// * `names` - Optional filter for specific tool names.
60    ///
61    /// # Returns
62    /// Vector of function definitions for the requested tools.
63    fn remote_tool_definitions(
64        &self,
65        endpoint: Option<&str>,
66        names: Option<&[String]>,
67    ) -> impl Future<Output = Result<Vec<FunctionDefinition>, BoxError>> + Send;
68
69    /// Removes and returns resources supported by the named tool.
70    fn select_tool_resources(
71        &self,
72        name: &str,
73        resources: &mut Vec<Resource>,
74    ) -> impl Future<Output = Vec<Resource>> + Send;
75
76    /// Returns definitions for available local agents.
77    ///
78    /// # Arguments
79    /// * `names` - Optional filter for specific agent names;
80    ///
81    /// # Returns
82    /// Vector of function definitions for the requested agents.
83    fn agent_definitions(&self, names: Option<&[String]>) -> Vec<FunctionDefinition>;
84
85    /// Returns definitions for agents exposed by remote engines.
86    ///
87    /// # Arguments
88    /// * `endpoint` - Optional filter for specific remote engine endpoint;
89    /// * `names` - Optional filter for specific agent names.
90    ///
91    /// # Returns
92    /// Vector of function definitions for the requested agents.
93    fn remote_agent_definitions(
94        &self,
95        endpoint: Option<&str>,
96        names: Option<&[String]>,
97    ) -> impl Future<Output = Result<Vec<FunctionDefinition>, BoxError>> + Send;
98
99    /// Removes and returns resources supported by the named agent.
100    fn select_agent_resources(
101        &self,
102        name: &str,
103        resources: &mut Vec<Resource>,
104    ) -> impl Future<Output = Vec<Resource>> + Send;
105
106    /// Returns definitions for all available tools and agents, including remote ones.
107    ///
108    /// # Arguments
109    /// * `names` - Optional filter for specific tool or agent names;
110    ///
111    /// # Returns
112    /// Vector of function definitions for the requested tools and agents.
113    fn definitions(
114        &self,
115        names: Option<&[String]>,
116    ) -> impl Future<Output = Vec<FunctionDefinition>> + Send;
117
118    /// Executes a local tool call.
119    ///
120    /// # Arguments
121    /// * `args` - Tool input arguments, [`ToolInput`].
122    ///
123    /// # Returns
124    /// [`ToolOutput`] containing the final result.
125    fn tool_call(
126        &self,
127        args: ToolInput<Json>,
128    ) -> impl Future<Output = Result<(ToolOutput<Json>, Option<Principal>), BoxError>> + Send;
129
130    /// Runs a local agent.
131    ///
132    /// # Arguments
133    /// * `args` - Agent input arguments, [`AgentInput`].
134    ///
135    /// # Returns
136    /// [`AgentOutput`] containing the result of the agent execution.
137    fn agent_run(
138        self,
139        args: AgentInput,
140    ) -> impl Future<Output = Result<(AgentOutput, Option<Principal>), BoxError>> + Send;
141
142    /// Runs a remote agent via HTTP RPC.
143    ///
144    /// # Arguments
145    /// * `endpoint` - Remote endpoint URL;
146    /// * `args` - Agent input arguments, [`AgentInput`]. The `meta` field will be set by the runtime.
147    ///
148    /// # Returns
149    /// [`AgentOutput`] containing the result of the agent execution.
150    fn remote_agent_run(
151        &self,
152        endpoint: &str,
153        args: AgentInput,
154    ) -> impl Future<Output = Result<AgentOutput, BoxError>> + Send;
155}
156
157/// Core execution environment available to both agents and tools.
158///
159/// `BaseContext` groups state, cryptographic, storage, caching, HTTP, and ICP
160/// canister capabilities behind a single trait bound.
161pub trait BaseContext:
162    Sized + StateFeatures + KeysFeatures + StoreFeatures + CacheFeatures + HttpFeatures + CanisterCaller
163{
164    /// Executes a remote tool call via HTTP RPC.
165    ///
166    /// # Arguments
167    /// * `endpoint` - Remote endpoint URL
168    /// * `args` - Tool input arguments, [`ToolInput`].
169    ///
170    /// # Returns
171    /// [`ToolOutput`] containing the final result.
172    fn remote_tool_call(
173        &self,
174        endpoint: &str,
175        args: ToolInput<Json>,
176    ) -> impl Future<Output = Result<ToolOutput<Json>, BoxError>> + Send;
177}
178
179/// Context metadata available during an agent or tool call.
180pub trait StateFeatures: Sized {
181    /// Returns the engine principal.
182    fn engine_id(&self) -> &Principal;
183
184    /// Returns the engine name.
185    fn engine_name(&self) -> &str;
186
187    /// Returns the verified caller principal if available.
188    /// A non-anonymous principal indicates that the request was verified
189    /// using ICP blockchain's signature verification algorithm.
190    /// Details: <https://github.com/ldclabs/ic-auth>
191    fn caller(&self) -> &Principal;
192
193    /// Returns metadata attached to the current request.
194    fn meta(&self) -> &RequestMeta;
195
196    /// Returns the cancellation token for the current execution context.
197    /// Each call level has its own token scope.
198    /// For example, when an agent calls a tool, the tool receives
199    /// a child token of the agent's token.
200    /// Cancelling the agent token cancels all child calls, while cancelling a
201    /// child token does not affect the parent context.
202    fn cancellation_token(&self) -> CancellationToken;
203
204    /// Returns the time elapsed since the context was created.
205    fn time_elapsed(&self) -> Duration;
206}
207
208/// Cryptographic key operations available to agents and tools.
209///
210/// Runtime implementations derive isolated AES, Ed25519, and Secp256k1 keys
211/// from their root key material. The active agent or tool namespace is included
212/// in derivation paths so identical user-supplied paths remain isolated across
213/// components.
214pub trait KeysFeatures: Sized {
215    /// Derives a 256-bit AES-GCM key from the given derivation path.
216    fn a256gcm_key(
217        &self,
218        derivation_path: Vec<Vec<u8>>,
219    ) -> impl Future<Output = Result<[u8; 32], BoxError>> + Send;
220
221    /// Signs a message using Ed25519 signature scheme from the given derivation path.
222    fn ed25519_sign_message(
223        &self,
224        derivation_path: Vec<Vec<u8>>,
225        message: &[u8],
226    ) -> impl Future<Output = Result<[u8; 64], BoxError>> + Send;
227
228    /// Verifies an Ed25519 signature from the given derivation path.
229    fn ed25519_verify(
230        &self,
231        derivation_path: Vec<Vec<u8>>,
232        message: &[u8],
233        signature: &[u8],
234    ) -> impl Future<Output = Result<(), BoxError>> + Send;
235
236    /// Returns the Ed25519 public key for the given derivation path.
237    fn ed25519_public_key(
238        &self,
239        derivation_path: Vec<Vec<u8>>,
240    ) -> impl Future<Output = Result<[u8; 32], BoxError>> + Send;
241
242    /// Signs a message using Secp256k1 BIP340 Schnorr signature from the given derivation path.
243    fn secp256k1_sign_message_bip340(
244        &self,
245        derivation_path: Vec<Vec<u8>>,
246        message: &[u8],
247    ) -> impl Future<Output = Result<[u8; 64], BoxError>> + Send;
248
249    /// Verifies a Secp256k1 BIP340 Schnorr signature from the given derivation path.
250    fn secp256k1_verify_bip340(
251        &self,
252        derivation_path: Vec<Vec<u8>>,
253        message: &[u8],
254        signature: &[u8],
255    ) -> impl Future<Output = Result<(), BoxError>> + Send;
256
257    /// Signs a SHA-256 digest using Secp256k1 ECDSA from the given derivation path.
258    /// The message will be hashed with SHA-256 before signing.
259    fn secp256k1_sign_message_ecdsa(
260        &self,
261        derivation_path: Vec<Vec<u8>>,
262        message: &[u8],
263    ) -> impl Future<Output = Result<[u8; 64], BoxError>> + Send;
264
265    /// Signs a message using Secp256k1 ECDSA signature from the given derivation path.
266    fn secp256k1_sign_digest_ecdsa(
267        &self,
268        derivation_path: Vec<Vec<u8>>,
269        message_hash: &[u8],
270    ) -> impl Future<Output = Result<[u8; 64], BoxError>> + Send;
271
272    /// Verifies a Secp256k1 ECDSA signature from the given derivation path.
273    fn secp256k1_verify_ecdsa(
274        &self,
275        derivation_path: Vec<Vec<u8>>,
276        message_hash: &[u8],
277        signature: &[u8],
278    ) -> impl Future<Output = Result<(), BoxError>> + Send;
279
280    /// Returns the compressed SEC1-encoded Secp256k1 public key for the given derivation path.
281    fn secp256k1_public_key(
282        &self,
283        derivation_path: Vec<Vec<u8>>,
284    ) -> impl Future<Output = Result<[u8; 33], BoxError>> + Send;
285}
286
287/// Persistent object storage available to agents and tools.
288///
289/// Provides persistent storage capabilities for Agents and Tools to store and manage data.
290/// All operations are asynchronous and return Result types with custom error handling.
291pub trait StoreFeatures: Sized {
292    /// Retrieves data from storage at the specified path.
293    fn store_get(
294        &self,
295        path: &Path,
296    ) -> impl Future<Output = Result<(bytes::Bytes, ObjectMeta), BoxError>> + Send;
297
298    /// Lists objects in storage with optional prefix and offset filters.
299    ///
300    /// # Arguments
301    /// * `prefix` - Optional path prefix to filter results;
302    /// * `offset` - Optional path to start listing from (exclude).
303    fn store_list(
304        &self,
305        prefix: Option<&Path>,
306        offset: &Path,
307    ) -> impl Future<Output = Result<Vec<ObjectMeta>, BoxError>> + Send;
308
309    /// Stores data at the specified path with a given write mode.
310    ///
311    /// # Arguments
312    /// * `path` - Target storage path;
313    /// * `mode` - Write mode (Create, Overwrite, etc.);
314    /// * `value` - Data to store as bytes.
315    fn store_put(
316        &self,
317        path: &Path,
318        mode: PutMode,
319        value: bytes::Bytes,
320    ) -> impl Future<Output = Result<PutResult, BoxError>> + Send;
321
322    /// Renames a storage object if the target path doesn't exist.
323    ///
324    /// # Arguments
325    /// * `from` - Source path;
326    /// * `to` - Destination path.
327    fn store_rename_if_not_exists(
328        &self,
329        from: &Path,
330        to: &Path,
331    ) -> impl Future<Output = Result<(), BoxError>> + Send;
332
333    /// Deletes data at the specified path.
334    ///
335    /// # Arguments
336    /// * `path` - Path of the object to delete.
337    fn store_delete(&self, path: &Path) -> impl Future<Output = Result<(), BoxError>> + Send;
338}
339
340/// Cache expiration policy for cached items.
341#[derive(Debug, Clone)]
342pub enum CacheExpiry {
343    /// Time-to-Live: Entry expires after duration from when it was set.
344    TTL(Duration),
345    /// Time-to-Idle: Entry expires after duration from last access.
346    TTI(Duration),
347}
348
349/// In-memory cache storage available to agents and tools.
350///
351/// Provides isolated in-memory cache storage with TTL/TTI expiration.
352/// Cache data is ephemeral and will be lost on engine restart.
353pub trait CacheFeatures: Sized {
354    /// Checks if a key exists in the cache.
355    fn cache_contains(&self, key: &str) -> bool;
356
357    /// Gets a cached value by key, returns error if not found or deserialization fails.
358    fn cache_get<T>(&self, key: &str) -> impl Future<Output = Result<T, BoxError>> + Send
359    where
360        T: DeserializeOwned;
361
362    /// Gets a cached value or initializes it if missing.
363    ///
364    /// If key doesn't exist, calls init function to create value and cache it.
365    fn cache_get_with<T, F>(
366        &self,
367        key: &str,
368        init: F,
369    ) -> impl Future<Output = Result<T, BoxError>> + Send
370    where
371        T: Sized + DeserializeOwned + Serialize + Send,
372        F: Future<Output = Result<(T, Option<CacheExpiry>), BoxError>> + Send + 'static;
373
374    /// Sets a value in cache with optional expiration policy.
375    fn cache_set<T>(
376        &self,
377        key: &str,
378        val: (T, Option<CacheExpiry>),
379    ) -> impl Future<Output = ()> + Send
380    where
381        T: Sized + Serialize + Send;
382
383    /// Sets a value in cache if key doesn't exist, returns true if set.
384    fn cache_set_if_not_exists<T>(
385        &self,
386        key: &str,
387        val: (T, Option<CacheExpiry>),
388    ) -> impl Future<Output = bool> + Send
389    where
390        T: Sized + Serialize + Send;
391
392    /// Deletes a cached value by key, returns true if key existed.
393    fn cache_delete(&self, key: &str) -> impl Future<Output = bool> + Send;
394
395    /// Returns an iterator over all cached items with raw value.
396    fn cache_raw_iter(
397        &self,
398    ) -> impl Iterator<Item = (Arc<String>, Arc<(Bytes, Option<CacheExpiry>)>)>;
399}
400
401/// HTTP request capabilities available to agents and tools.
402///
403/// All HTTP requests are managed and scheduled by the runtime. Since agents may
404/// run in WASM containers, implementations should not
405/// implement HTTP requests directly.
406pub trait HttpFeatures: Sized {
407    /// Makes an HTTPS request.
408    ///
409    /// # Arguments
410    /// * `url` - Target URL, should start with `https://`;
411    /// * `method` - HTTP method (GET, POST, etc.);
412    /// * `headers` - Optional HTTP headers;
413    /// * `body` - Optional request body (default empty).
414    fn https_call(
415        &self,
416        url: &str,
417        method: http::Method,
418        headers: Option<http::HeaderMap>,
419        body: Option<Vec<u8>>, // default is empty
420    ) -> impl Future<Output = Result<reqwest::Response, BoxError>> + Send;
421
422    /// Makes a signed HTTPS request with message authentication.
423    ///
424    /// # Arguments
425    /// * `url` - Target URL;
426    /// * `method` - HTTP method (GET, POST, etc.);
427    /// * `message_digest` - 32-byte message digest for signing;
428    /// * `headers` - Optional HTTP headers;
429    /// * `body` - Optional request body (default empty).
430    fn https_signed_call(
431        &self,
432        url: &str,
433        method: http::Method,
434        message_digest: [u8; 32],
435        headers: Option<http::HeaderMap>,
436        body: Option<Vec<u8>>,
437    ) -> impl Future<Output = Result<reqwest::Response, BoxError>> + Send;
438
439    /// Makes a signed CBOR-encoded RPC call.
440    ///
441    /// # Arguments
442    /// * `endpoint` - URL endpoint to send the request to;
443    /// * `method` - RPC method name to call;
444    /// * `args` - Arguments to serialize as CBOR and send with the request.
445    fn https_signed_rpc<T>(
446        &self,
447        endpoint: &str,
448        method: &str,
449        args: impl Serialize + Send,
450    ) -> impl Future<Output = Result<T, BoxError>> + Send
451    where
452        T: DeserializeOwned;
453}
454
455#[derive(Clone, Deserialize, Serialize)]
456struct CacheStoreValue<T>(T, UpdateVersion);
457
458/// Convenience methods for values backed by both cache and object storage.
459#[async_trait]
460pub trait CacheStoreFeatures: StoreFeatures + CacheFeatures + Send + Sync + 'static {
461    /// Initializes a cached value from storage, or creates it with `init` if missing.
462    async fn cache_store_init<T, F>(&self, key: &str, init: F) -> Result<(), BoxError>
463    where
464        T: DeserializeOwned + Serialize + Send,
465        F: Future<Output = Result<T, BoxError>> + Send + 'static,
466    {
467        let p = Path::from(key);
468        match self.store_get(&p).await {
469            Ok((v, meta)) => {
470                let val: T = from_reader(&v[..])?;
471                self.cache_set(
472                    key,
473                    (
474                        CacheStoreValue(
475                            val,
476                            UpdateVersion {
477                                e_tag: meta.e_tag,
478                                version: meta.version,
479                            },
480                        ),
481                        None,
482                    ),
483                )
484                .await;
485                Ok(())
486            }
487            Err(_) => {
488                let val: T = init.await?;
489                let data = deterministic_cbor_into_vec(&val)?;
490                let res = self.store_put(&p, PutMode::Create, data.into()).await?;
491                self.cache_set(
492                    key,
493                    (
494                        CacheStoreValue(
495                            val,
496                            UpdateVersion {
497                                e_tag: res.e_tag,
498                                version: res.version,
499                            },
500                        ),
501                        None,
502                    ),
503                )
504                .await;
505                Ok(())
506            }
507        }
508    }
509
510    /// Returns a value and its storage version, loading it into cache if needed.
511    async fn cache_store_get<T>(&self, key: &str) -> Result<(T, UpdateVersion), BoxError>
512    where
513        T: DeserializeOwned + Serialize + Send + Sync,
514    {
515        match self.cache_get::<CacheStoreValue<T>>(key).await {
516            Ok(CacheStoreValue(val, ver)) => Ok((val, ver)),
517            Err(_) => {
518                // fetch from store and set in cache
519                let p = Path::from(key);
520                let (v, meta) = self.store_get(&p).await?;
521                let val: T = from_reader(&v[..])?;
522                let version = UpdateVersion {
523                    e_tag: meta.e_tag,
524                    version: meta.version,
525                };
526                self.cache_set(key, (CacheStoreValue(&val, version.clone()), None))
527                    .await;
528                Ok((val, version))
529            }
530        }
531    }
532
533    /// Persists a value to storage and updates the cache on success.
534    ///
535    /// When `version` is provided, the write uses an atomic update against that
536    /// storage version. Without a version, the value is written with overwrite
537    /// semantics.
538    async fn cache_store_set<T>(
539        &self,
540        key: &str,
541        val: T,
542        version: Option<UpdateVersion>,
543    ) -> Result<UpdateVersion, BoxError>
544    where
545        T: DeserializeOwned + Serialize + Send,
546    {
547        let data = deterministic_cbor_into_vec(&val)?;
548        let p = Path::from(key);
549        if let Some(ver) = version {
550            // atomic update
551            let res = self
552                .store_put(
553                    &p,
554                    PutMode::Update(OsVersion {
555                        e_tag: ver.e_tag.clone(),
556                        version: ver.version.clone(),
557                    }),
558                    data.into(),
559                )
560                .await?;
561            // we can set the cache value after atomic update succeeded
562            let ver = UpdateVersion {
563                e_tag: res.e_tag,
564                version: res.version,
565            };
566            self.cache_set(key, (CacheStoreValue(val, ver.clone()), None))
567                .await;
568            Ok(ver)
569        } else {
570            let res = self.store_put(&p, PutMode::Overwrite, data.into()).await?;
571            let ver = UpdateVersion {
572                e_tag: res.e_tag,
573                version: res.version,
574            };
575            self.cache_set(key, (CacheStoreValue(val, ver.clone()), None))
576                .await;
577            Ok(ver)
578        }
579    }
580
581    /// Deletes a value from both cache and storage.
582    async fn cache_store_delete(&self, key: &str) -> Result<(), BoxError> {
583        let p = Path::from(key);
584        self.cache_delete(key).await;
585        self.store_delete(&p).await
586    }
587}
588
589/// Prefixes a derivation path with the current context path.
590pub fn derivation_path_with(path: &Path, derivation_path: Vec<Vec<u8>>) -> Vec<Vec<u8>> {
591    let mut dp = Vec::with_capacity(derivation_path.len() + 1);
592    dp.push(path.as_ref().as_bytes().to_vec());
593    dp.extend(derivation_path);
594    dp
595}
596
597#[cfg(test)]
598mod tests {
599    use super::*;
600    use futures::executor::block_on;
601    use std::{
602        collections::BTreeMap,
603        sync::{
604            Arc, Mutex,
605            atomic::{AtomicUsize, Ordering},
606        },
607    };
608
609    type TestCacheValue = Arc<(Bytes, Option<CacheExpiry>)>;
610    type TestCacheMap = BTreeMap<String, TestCacheValue>;
611
612    #[derive(Default)]
613    struct TestCacheStore {
614        cache: Mutex<TestCacheMap>,
615        store: Mutex<BTreeMap<String, (Bytes, UpdateVersion)>>,
616        store_gets: AtomicUsize,
617        versions: AtomicUsize,
618    }
619
620    impl TestCacheStore {
621        fn put_serialized(&self, key: &str, value: Vec<u8>, version: UpdateVersion) {
622            self.store
623                .lock()
624                .unwrap()
625                .insert(key.to_string(), (value.into(), version));
626        }
627
628        fn next_version(&self) -> UpdateVersion {
629            let version = self.versions.fetch_add(1, Ordering::SeqCst) + 1;
630            UpdateVersion {
631                e_tag: Some(format!("etag-{version}")),
632                version: Some(version.to_string()),
633            }
634        }
635    }
636
637    impl CacheFeatures for TestCacheStore {
638        fn cache_contains(&self, key: &str) -> bool {
639            self.cache.lock().unwrap().contains_key(key)
640        }
641
642        async fn cache_get<T>(&self, key: &str) -> Result<T, BoxError>
643        where
644            T: DeserializeOwned,
645        {
646            let value = self
647                .cache
648                .lock()
649                .unwrap()
650                .get(key)
651                .cloned()
652                .ok_or_else(|| format!("key {key} not found"))?;
653            from_reader(&value.0[..]).map_err(|err| err.into())
654        }
655
656        async fn cache_get_with<T, F>(&self, key: &str, init: F) -> Result<T, BoxError>
657        where
658            T: Sized + DeserializeOwned + Serialize + Send,
659            F: Future<Output = Result<(T, Option<CacheExpiry>), BoxError>> + Send + 'static,
660        {
661            if let Some(value) = self.cache.lock().unwrap().get(key).cloned() {
662                return from_reader(&value.0[..]).map_err(|err| err.into());
663            }
664
665            let (value, expiry) = init.await?;
666            let data = deterministic_cbor_into_vec(&value)?;
667            self.cache
668                .lock()
669                .unwrap()
670                .insert(key.to_string(), Arc::new((data.into(), expiry)));
671            Ok(value)
672        }
673
674        async fn cache_set<T>(&self, key: &str, val: (T, Option<CacheExpiry>))
675        where
676            T: Sized + Serialize + Send,
677        {
678            let data = deterministic_cbor_into_vec(&val.0).unwrap();
679            self.cache
680                .lock()
681                .unwrap()
682                .insert(key.to_string(), Arc::new((data.into(), val.1)));
683        }
684
685        async fn cache_set_if_not_exists<T>(&self, key: &str, val: (T, Option<CacheExpiry>)) -> bool
686        where
687            T: Sized + Serialize + Send,
688        {
689            let mut cache = self.cache.lock().unwrap();
690            if cache.contains_key(key) {
691                return false;
692            }
693
694            let data = deterministic_cbor_into_vec(&val.0).unwrap();
695            cache.insert(key.to_string(), Arc::new((data.into(), val.1)));
696            true
697        }
698
699        async fn cache_delete(&self, key: &str) -> bool {
700            self.cache.lock().unwrap().remove(key).is_some()
701        }
702
703        fn cache_raw_iter(
704            &self,
705        ) -> impl Iterator<Item = (Arc<String>, Arc<(Bytes, Option<CacheExpiry>)>)> {
706            self.cache
707                .lock()
708                .unwrap()
709                .iter()
710                .map(|(key, value)| (Arc::new(key.clone()), value.clone()))
711                .collect::<Vec<_>>()
712                .into_iter()
713        }
714    }
715
716    impl StoreFeatures for TestCacheStore {
717        async fn store_get(&self, path: &Path) -> Result<(bytes::Bytes, ObjectMeta), BoxError> {
718            self.store_gets.fetch_add(1, Ordering::SeqCst);
719            let (value, version) = self
720                .store
721                .lock()
722                .unwrap()
723                .get(path.as_ref())
724                .cloned()
725                .ok_or_else(|| format!("path {path} not found"))?;
726
727            Ok((
728                value.clone(),
729                ObjectMeta {
730                    location: path.clone(),
731                    last_modified: chrono::Utc::now(),
732                    size: value.len() as u64,
733                    e_tag: version.e_tag,
734                    version: version.version,
735                },
736            ))
737        }
738
739        async fn store_list(
740            &self,
741            _prefix: Option<&Path>,
742            _offset: &Path,
743        ) -> Result<Vec<ObjectMeta>, BoxError> {
744            Ok(Vec::new())
745        }
746
747        async fn store_put(
748            &self,
749            path: &Path,
750            mode: PutMode,
751            value: bytes::Bytes,
752        ) -> Result<PutResult, BoxError> {
753            let key = path.as_ref().to_string();
754            let mut store = self.store.lock().unwrap();
755            match mode {
756                PutMode::Create if store.contains_key(&key) => {
757                    return Err(format!("path {path} already exists").into());
758                }
759                PutMode::Update(expected) => {
760                    let Some((_, current)) = store.get(&key) else {
761                        return Err(format!("path {path} not found").into());
762                    };
763                    if current.e_tag != expected.e_tag || current.version != expected.version {
764                        return Err(format!("path {path} version mismatch").into());
765                    }
766                }
767                _ => {}
768            }
769
770            let version = self.next_version();
771            store.insert(key, (value, version.clone()));
772            Ok(PutResult {
773                e_tag: version.e_tag,
774                version: version.version,
775            })
776        }
777
778        async fn store_rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<(), BoxError> {
779            let mut store = self.store.lock().unwrap();
780            let to = to.as_ref().to_string();
781            if store.contains_key(&to) {
782                return Err(format!("path {to} already exists").into());
783            }
784            let value = store
785                .remove(from.as_ref())
786                .ok_or_else(|| format!("path {from} not found"))?;
787            store.insert(to, value);
788            Ok(())
789        }
790
791        async fn store_delete(&self, path: &Path) -> Result<(), BoxError> {
792            self.store.lock().unwrap().remove(path.as_ref());
793            Ok(())
794        }
795    }
796
797    impl CacheStoreFeatures for TestCacheStore {}
798
799    #[test]
800    fn cache_store_get_populates_cache_without_second_store_read() {
801        let ctx = TestCacheStore::default();
802        let stored_version = UpdateVersion {
803            e_tag: Some("etag-stored".to_string()),
804            version: Some("1".to_string()),
805        };
806        let data = deterministic_cbor_into_vec(&123_u32).unwrap();
807        ctx.put_serialized("answer", data, stored_version.clone());
808
809        let (value, version) = block_on(ctx.cache_store_get::<u32>("answer")).unwrap();
810        assert_eq!(value, 123);
811        assert_eq!(version.e_tag, stored_version.e_tag);
812        assert_eq!(version.version, stored_version.version);
813        assert_eq!(ctx.store_gets.load(Ordering::SeqCst), 1);
814
815        let (value, _) = block_on(ctx.cache_store_get::<u32>("answer")).unwrap();
816        assert_eq!(value, 123);
817        assert_eq!(ctx.store_gets.load(Ordering::SeqCst), 1);
818    }
819
820    #[test]
821    fn cache_store_set_overwrite_updates_cache() {
822        let ctx = TestCacheStore::default();
823
824        let version = block_on(ctx.cache_store_set("answer", 42_u32, None)).unwrap();
825        assert_eq!(ctx.store_gets.load(Ordering::SeqCst), 0);
826
827        let (value, cached_version) = block_on(ctx.cache_store_get::<u32>("answer")).unwrap();
828        assert_eq!(value, 42);
829        assert_eq!(cached_version.e_tag, version.e_tag);
830        assert_eq!(cached_version.version, version.version);
831        assert_eq!(ctx.store_gets.load(Ordering::SeqCst), 0);
832    }
833
834    #[test]
835    fn cache_store_init_loads_existing_value_and_skips_initializer() {
836        let ctx = TestCacheStore::default();
837        let stored_version = UpdateVersion {
838            e_tag: Some("etag-existing".to_string()),
839            version: Some("7".to_string()),
840        };
841        let data = deterministic_cbor_into_vec(&"stored".to_string()).unwrap();
842        ctx.put_serialized("message", data, stored_version.clone());
843
844        block_on(ctx.cache_store_init("message", async {
845            Err::<String, BoxError>("initializer should not run".into())
846        }))
847        .unwrap();
848
849        let (value, version) = block_on(ctx.cache_store_get::<String>("message")).unwrap();
850        assert_eq!(value, "stored");
851        assert_eq!(version.e_tag, stored_version.e_tag);
852        assert_eq!(version.version, stored_version.version);
853        assert_eq!(ctx.store_gets.load(Ordering::SeqCst), 1);
854    }
855
856    #[test]
857    fn cache_store_init_creates_missing_value_and_delete_clears_layers() {
858        let ctx = TestCacheStore::default();
859
860        block_on(ctx.cache_store_init("message", async {
861            Ok::<_, BoxError>("created".to_string())
862        }))
863        .unwrap();
864        assert!(ctx.cache_contains("message"));
865        assert!(ctx.store.lock().unwrap().contains_key("message"));
866
867        let (value, _) = block_on(ctx.cache_store_get::<String>("message")).unwrap();
868        assert_eq!(value, "created");
869
870        block_on(ctx.cache_store_delete("message")).unwrap();
871        assert!(!ctx.cache_contains("message"));
872        assert!(!ctx.store.lock().unwrap().contains_key("message"));
873    }
874
875    #[test]
876    fn cache_store_set_update_enforces_expected_version() {
877        let ctx = TestCacheStore::default();
878
879        let version = block_on(ctx.cache_store_set("answer", 1_u32, None)).unwrap();
880        let updated = block_on(ctx.cache_store_set("answer", 2_u32, Some(version))).unwrap();
881        let (value, cached_version) = block_on(ctx.cache_store_get::<u32>("answer")).unwrap();
882        assert_eq!(value, 2);
883        assert_eq!(cached_version.version, updated.version);
884
885        let err = block_on(ctx.cache_store_set(
886            "answer",
887            3_u32,
888            Some(UpdateVersion {
889                e_tag: Some("wrong".to_string()),
890                version: Some("wrong".to_string()),
891            }),
892        ))
893        .unwrap_err();
894        assert!(err.to_string().contains("version mismatch"));
895    }
896
897    #[test]
898    fn cache_and_store_mock_helpers_cover_absent_existing_and_error_paths() {
899        let ctx = TestCacheStore::default();
900
901        let ttl = CacheExpiry::TTL(Duration::from_secs(5));
902        block_on(ctx.cache_set("ttl", ("one".to_string(), Some(ttl.clone()))));
903        assert!(ctx.cache_contains("ttl"));
904
905        let existing = block_on(ctx.cache_set_if_not_exists(
906            "ttl",
907            (
908                "two".to_string(),
909                Some(CacheExpiry::TTI(Duration::from_secs(9))),
910            ),
911        ));
912        assert!(!existing);
913
914        let inserted = block_on(ctx.cache_set_if_not_exists(
915            "tti",
916            (
917                "three".to_string(),
918                Some(CacheExpiry::TTI(Duration::from_secs(9))),
919            ),
920        ));
921        assert!(inserted);
922
923        let mut seen = ctx
924            .cache_raw_iter()
925            .map(|(key, value)| (key.to_string(), value.1.clone()))
926            .collect::<Vec<_>>();
927        seen.sort_by(|a, b| a.0.cmp(&b.0));
928        assert_eq!(seen.len(), 2);
929        let ttl_expiry = seen
930            .iter()
931            .find(|(key, _)| key == "ttl")
932            .and_then(|(_, expiry)| expiry.as_ref())
933            .unwrap();
934        let tti_expiry = seen
935            .iter()
936            .find(|(key, _)| key == "tti")
937            .and_then(|(_, expiry)| expiry.as_ref())
938            .unwrap();
939        match ttl_expiry {
940            CacheExpiry::TTL(duration) => assert_eq!(*duration, Duration::from_secs(5)),
941            CacheExpiry::TTI(_) => panic!("expected ttl"),
942        }
943        match tti_expiry {
944            CacheExpiry::TTI(duration) => assert_eq!(*duration, Duration::from_secs(9)),
945            CacheExpiry::TTL(_) => panic!("expected tti"),
946        }
947
948        let value =
949            block_on(ctx.cache_get_with("lazy", async { Ok::<_, BoxError>((99_u32, None)) }))
950                .unwrap();
951        assert_eq!(value, 99);
952        let cached =
953            block_on(ctx.cache_get_with("lazy", async { Ok::<_, BoxError>((100_u32, None)) }))
954                .unwrap();
955        assert_eq!(cached, 99);
956
957        let first =
958            block_on(ctx.store_put(&Path::from("created"), PutMode::Create, Bytes::from("a")))
959                .unwrap();
960        assert!(first.version.is_some());
961        let err =
962            block_on(ctx.store_put(&Path::from("created"), PutMode::Create, Bytes::from("b")))
963                .unwrap_err();
964        assert!(err.to_string().contains("already exists"));
965
966        block_on(ctx.store_rename_if_not_exists(&Path::from("created"), &Path::from("renamed")))
967            .unwrap();
968        assert!(ctx.store.lock().unwrap().contains_key("renamed"));
969        let err = block_on(
970            ctx.store_rename_if_not_exists(&Path::from("missing"), &Path::from("renamed")),
971        )
972        .unwrap_err();
973        assert!(err.to_string().contains("already exists"));
974        let err = block_on(
975            ctx.store_rename_if_not_exists(&Path::from("missing"), &Path::from("new-destination")),
976        )
977        .unwrap_err();
978        assert!(err.to_string().contains("not found"));
979
980        let listed =
981            block_on(ctx.store_list(Some(&Path::from("r")), &Path::from("renamed"))).unwrap();
982        assert!(listed.is_empty());
983    }
984
985    #[test]
986    fn derivation_path_with_prefixes_current_path() {
987        let path = Path::from("agent/main");
988        let derivation_path = derivation_path_with(&path, vec![b"child".to_vec()]);
989        assert_eq!(
990            derivation_path,
991            vec![b"agent/main".to_vec(), b"child".to_vec()]
992        );
993    }
994}