Skip to main content

calimero_context/
lib.rs

1#![expect(clippy::unwrap_in_result, reason = "Repr transmute")]
2#![allow(clippy::multiple_inherent_impl, reason = "better readability")]
3
4use std::collections::{btree_map, BTreeMap};
5use std::future::Future;
6use std::sync::Arc;
7
8use actix::Actor;
9use calimero_context_config::client::config::ClientConfig as ExternalClientConfig;
10use calimero_context_primitives::client::ContextClient;
11use calimero_node_primitives::client::NodeClient;
12use calimero_primitives::application::{Application, ApplicationId};
13use calimero_primitives::context::{Context, ContextId};
14use calimero_store::Store;
15use either::Either;
16use prometheus_client::registry::Registry;
17use tokio::sync::{Mutex, OwnedMutexGuard};
18
19use crate::metrics::Metrics;
20
21pub mod config;
22pub mod error;
23pub mod handlers;
24mod metrics;
25
26/// A metadata container for a single, in-memory context.
27///
28/// It holds the context's core properties and an asynchronous mutex (`lock`).
29/// This lock is crucial for serializing operations on this specific context,
30/// allowing the `ContextManager` to process requests for different contexts in parallel
31/// while ensuring data consistency for any single context.
32#[derive(Debug)]
33struct ContextMeta {
34    meta: Context,
35    lock: Arc<Mutex<ContextId>>,
36}
37
38/// The central actor responsible for managing the lifecycle of all contexts.
39///
40/// As an actor, it maintains its own state and processes incoming messages
41/// sequentially from a mailbox.
42#[derive(Debug)]
43pub struct ContextManager {
44    /// Handle to the persistent key-value store. Used for fetching context data on cache misses.
45    datastore: Store,
46
47    /// Client for interacting with the underlying Calimero node.
48    node_client: NodeClient,
49    /// The public-facing client API, also used internally to access convenience methods
50    /// for interacting with the datastore.
51    context_client: ContextClient,
52
53    /// Configuration for interacting with external blockchain contracts (e.g., NEAR).
54    external_config: ExternalClientConfig,
55
56    /// An in-memory cache of active contexts (`ContextId` -> `ContextMeta`).
57    /// This serves as a hot cache to avoid expensive disk I/O for frequently accessed contexts.
58    // todo! potentially make this a dashmap::DashMap
59    // todo! use cached::TimedSizedCache with a gc task
60    contexts: BTreeMap<ContextId, ContextMeta>,
61    /// An in-memory cache of application metadata (`ApplicationId` -> `Application`).
62    /// Caching this prevents repeated fetching and parsing of application details.
63    ///
64    /// # Note
65    /// Even when 2 applications point to the same bytecode,
66    /// the application's metadata may include information
67    /// that might be relevant in the compilation process,
68    /// so we cannot blindly reuse compiled blobs across apps.
69    applications: BTreeMap<ApplicationId, Application>,
70
71    /// Prometheus metrics for monitoring the health and performance of the manager,
72    /// such as number of active contexts, message processing latency, etc.
73    metrics: Option<Metrics>,
74    //
75    // todo! when runtime let's us compile blobs separate from its
76    // todo! execution, we can introduce a cached::TimedSizedCache
77    // runtimes: TimedSizedCache<Exclusive<calimero_runtime::Engine>>,
78    //
79}
80
81/// Creates a new `ContextManager`.
82///
83/// # Arguments
84///
85/// * `datastore` - The persistent storage backend.
86/// * `node_client` - Client for interacting with the underlying Calimero node.
87/// * `context_client` - The context client facade.
88/// * `external_config` - Configuration for interacting with external blockchain contracts (e.g.,
89/// NEAR).
90/// * `prometheus_registry` - A mutable reference to a Prometheus registry for registering metrics.
91impl ContextManager {
92    pub fn new(
93        datastore: Store,
94        node_client: NodeClient,
95        context_client: ContextClient,
96        external_config: ExternalClientConfig,
97        prometheus_registry: Option<&mut Registry>,
98    ) -> Self {
99        Self {
100            datastore,
101            node_client,
102            context_client,
103            external_config,
104
105            contexts: BTreeMap::new(),
106            applications: BTreeMap::new(),
107
108            metrics: prometheus_registry.map(Metrics::new),
109        }
110    }
111}
112
113/// Implements the `Actor` trait for `ContextManager`, allowing it to run within the Actix framework.
114///
115/// By implementing `Actor`, `ContextManager` gains a "Context" (an execution environment) and a mailbox.
116/// Messages sent to the manager are queued in its mailbox and processed one at a time in the order
117/// they are received, which is the core of the actor model's safety guarantee for its internal state.
118impl Actor for ContextManager {
119    type Context = actix::Context<Self>;
120}
121
122impl ContextMeta {
123    /// Acquires an asynchronous lock for this specific context.
124    ///
125    /// This is a performance-optimized lock acquisition strategy. It first attempts an
126    /// optimistic, non-blocking `try_lock_owned()`. This is very fast if the lock is not contended.
127    ///
128    /// # Returns
129    ///
130    /// An `Either` enum containing one of two possibilities:
131    /// - `Either::Left(OwnedMutexGuard)`: If the lock was acquired immediately without waiting.
132    /// - `Either::Right(impl Future)`: If the lock was contended. This future will resolve
133    ///    to an `OwnedMutexGuard` once the lock becomes available. The caller must `.await` this future.
134    fn lock(
135        &self,
136    ) -> Either<OwnedMutexGuard<ContextId>, impl Future<Output = OwnedMutexGuard<ContextId>>> {
137        let Ok(guard) = self.lock.clone().try_lock_owned() else {
138            return Either::Right(self.lock.clone().lock_owned());
139        };
140
141        Either::Left(guard)
142    }
143}
144
145impl ContextManager {
146    /// Retrieves context metadata, fetching from the datastore if not present in the cache.
147    ///
148    /// This function implements the "cache-aside" pattern. It first checks the in-memory
149    /// `contexts` BTreeMap. On a cache miss, it falls back to querying the persistent
150    /// `datastore` via the `context_client`, populates the cache with the result,
151    /// and then returns the data.
152    ///
153    /// # Arguments
154    ///
155    /// * `context_id` - The unique identifier of the context to retrieve.
156    ///
157    /// # Returns
158    ///
159    /// A `Result` containing an `Option` with a reference to the `ContextMeta`.
160    /// Returns `Ok(Some(&ContextMeta))` if the context is found in the cache or datastore.
161    /// Returns `Ok(None)` if the context does not exist in the datastore.
162    /// Returns `Err` if a datastore error occurs.
163    fn get_or_fetch_context(
164        &mut self,
165        context_id: &ContextId,
166    ) -> eyre::Result<Option<&ContextMeta>> {
167        let entry = self.contexts.entry(*context_id);
168
169        match entry {
170            btree_map::Entry::Occupied(mut occupied) => {
171                // CRITICAL FIX: Always reload dag_heads from database to get latest state
172                // The dag_heads can be updated by delta_store when receiving network deltas,
173                // but the cached Context object won't reflect these changes.
174                // This was causing all deltas to use genesis as parent instead of actual dag_heads.
175                let handle = self.datastore.handle();
176                let key = calimero_store::key::ContextMeta::new(*context_id);
177
178                if let Some(meta) = handle.get(&key)? {
179                    let cached = occupied.get_mut();
180
181                    // Update dag_heads if they changed in DB
182                    if cached.meta.dag_heads != meta.dag_heads {
183                        tracing::debug!(
184                            %context_id,
185                            old_heads_count = cached.meta.dag_heads.len(),
186                            new_heads_count = meta.dag_heads.len(),
187                            "Refreshing dag_heads from database (cache was stale)"
188                        );
189                        cached.meta.dag_heads = meta.dag_heads;
190                    }
191
192                    // Also update root_hash in case it changed
193                    cached.meta.root_hash = meta.root_hash.into();
194                }
195
196                Ok(Some(occupied.into_mut()))
197            }
198            btree_map::Entry::Vacant(vacant) => {
199                let Some(context) = self.context_client.get_context(context_id)? else {
200                    return Ok(None);
201                };
202
203                let lock = Arc::new(Mutex::new(*context_id));
204
205                let item = vacant.insert(ContextMeta {
206                    meta: context,
207                    lock,
208                });
209
210                Ok(Some(item))
211            }
212        }
213    }
214}
215
216// objectives:
217//   keep up to N items, refresh entries as they are used
218//   garbage collect entries as they expire, or as needed
219//   share across tasks efficiently, not prolonging locks
220//   managed mutation, so guards aren't held for too long
221//
222// result: this should help us share data between clients
223//         and their actors,
224//
225// pub struct SharedCache<K, V> {
226//     cache: DashMap<Key<K>, V>,
227//     index: ArcTimedSizedCache<K, Key<K>>,
228// }
229//
230// struct Key<K>(K);
231// struct Cached<V: Copy>(..);
232//        ^- aids read without locking
233//           downside: Copy on every write