calimero_context/lib.rs
1#![expect(clippy::unwrap_in_result, reason = "Repr transmute")]
2#![allow(clippy::multiple_inherent_impl, reason = "better readability")]
3
4use std::collections::{btree_map, BTreeMap};
5use std::future::Future;
6use std::sync::Arc;
7
8use actix::Actor;
9use calimero_context_config::client::config::ClientConfig as ExternalClientConfig;
10use calimero_context_primitives::client::ContextClient;
11use calimero_node_primitives::client::NodeClient;
12use calimero_primitives::application::{Application, ApplicationId};
13use calimero_primitives::context::{Context, ContextId};
14use calimero_store::Store;
15use either::Either;
16use prometheus_client::registry::Registry;
17use tokio::sync::{Mutex, OwnedMutexGuard};
18
19use crate::metrics::Metrics;
20
21pub mod config;
22pub mod error;
23pub mod handlers;
24mod metrics;
25
26/// A metadata container for a single, in-memory context.
27///
28/// It holds the context's core properties and an asynchronous mutex (`lock`).
29/// This lock is crucial for serializing operations on this specific context,
30/// allowing the `ContextManager` to process requests for different contexts in parallel
31/// while ensuring data consistency for any single context.
32#[derive(Debug)]
33struct ContextMeta {
34 meta: Context,
35 lock: Arc<Mutex<ContextId>>,
36}
37
38/// The central actor responsible for managing the lifecycle of all contexts.
39///
40/// As an actor, it maintains its own state and processes incoming messages
41/// sequentially from a mailbox.
42#[derive(Debug)]
43pub struct ContextManager {
44 /// Handle to the persistent key-value store. Used for fetching context data on cache misses.
45 datastore: Store,
46
47 /// Client for interacting with the underlying Calimero node.
48 node_client: NodeClient,
49 /// The public-facing client API, also used internally to access convenience methods
50 /// for interacting with the datastore.
51 context_client: ContextClient,
52
53 /// Configuration for interacting with external blockchain contracts (e.g., NEAR).
54 external_config: ExternalClientConfig,
55
56 /// An in-memory cache of active contexts (`ContextId` -> `ContextMeta`).
57 /// This serves as a hot cache to avoid expensive disk I/O for frequently accessed contexts.
58 // todo! potentially make this a dashmap::DashMap
59 // todo! use cached::TimedSizedCache with a gc task
60 contexts: BTreeMap<ContextId, ContextMeta>,
61 /// An in-memory cache of application metadata (`ApplicationId` -> `Application`).
62 /// Caching this prevents repeated fetching and parsing of application details.
63 ///
64 /// # Note
65 /// Even when 2 applications point to the same bytecode,
66 /// the application's metadata may include information
67 /// that might be relevant in the compilation process,
68 /// so we cannot blindly reuse compiled blobs across apps.
69 applications: BTreeMap<ApplicationId, Application>,
70
71 /// Prometheus metrics for monitoring the health and performance of the manager,
72 /// such as number of active contexts, message processing latency, etc.
73 metrics: Option<Metrics>,
74 //
75 // todo! when runtime let's us compile blobs separate from its
76 // todo! execution, we can introduce a cached::TimedSizedCache
77 // runtimes: TimedSizedCache<Exclusive<calimero_runtime::Engine>>,
78 //
79}
80
81/// Creates a new `ContextManager`.
82///
83/// # Arguments
84///
85/// * `datastore` - The persistent storage backend.
86/// * `node_client` - Client for interacting with the underlying Calimero node.
87/// * `context_client` - The context client facade.
88/// * `external_config` - Configuration for interacting with external blockchain contracts (e.g.,
89/// NEAR).
90/// * `prometheus_registry` - A mutable reference to a Prometheus registry for registering metrics.
91impl ContextManager {
92 pub fn new(
93 datastore: Store,
94 node_client: NodeClient,
95 context_client: ContextClient,
96 external_config: ExternalClientConfig,
97 prometheus_registry: Option<&mut Registry>,
98 ) -> Self {
99 Self {
100 datastore,
101 node_client,
102 context_client,
103 external_config,
104
105 contexts: BTreeMap::new(),
106 applications: BTreeMap::new(),
107
108 metrics: prometheus_registry.map(Metrics::new),
109 }
110 }
111}
112
113/// Implements the `Actor` trait for `ContextManager`, allowing it to run within the Actix framework.
114///
115/// By implementing `Actor`, `ContextManager` gains a "Context" (an execution environment) and a mailbox.
116/// Messages sent to the manager are queued in its mailbox and processed one at a time in the order
117/// they are received, which is the core of the actor model's safety guarantee for its internal state.
118impl Actor for ContextManager {
119 type Context = actix::Context<Self>;
120}
121
122impl ContextMeta {
123 /// Acquires an asynchronous lock for this specific context.
124 ///
125 /// This is a performance-optimized lock acquisition strategy. It first attempts an
126 /// optimistic, non-blocking `try_lock_owned()`. This is very fast if the lock is not contended.
127 ///
128 /// # Returns
129 ///
130 /// An `Either` enum containing one of two possibilities:
131 /// - `Either::Left(OwnedMutexGuard)`: If the lock was acquired immediately without waiting.
132 /// - `Either::Right(impl Future)`: If the lock was contended. This future will resolve
133 /// to an `OwnedMutexGuard` once the lock becomes available. The caller must `.await` this future.
134 fn lock(
135 &self,
136 ) -> Either<OwnedMutexGuard<ContextId>, impl Future<Output = OwnedMutexGuard<ContextId>>> {
137 let Ok(guard) = self.lock.clone().try_lock_owned() else {
138 return Either::Right(self.lock.clone().lock_owned());
139 };
140
141 Either::Left(guard)
142 }
143}
144
145impl ContextManager {
146 /// Retrieves context metadata, fetching from the datastore if not present in the cache.
147 ///
148 /// This function implements the "cache-aside" pattern. It first checks the in-memory
149 /// `contexts` BTreeMap. On a cache miss, it falls back to querying the persistent
150 /// `datastore` via the `context_client`, populates the cache with the result,
151 /// and then returns the data.
152 ///
153 /// # Arguments
154 ///
155 /// * `context_id` - The unique identifier of the context to retrieve.
156 ///
157 /// # Returns
158 ///
159 /// A `Result` containing an `Option` with a reference to the `ContextMeta`.
160 /// Returns `Ok(Some(&ContextMeta))` if the context is found in the cache or datastore.
161 /// Returns `Ok(None)` if the context does not exist in the datastore.
162 /// Returns `Err` if a datastore error occurs.
163 fn get_or_fetch_context(
164 &mut self,
165 context_id: &ContextId,
166 ) -> eyre::Result<Option<&ContextMeta>> {
167 let entry = self.contexts.entry(*context_id);
168
169 match entry {
170 btree_map::Entry::Occupied(mut occupied) => {
171 // CRITICAL FIX: Always reload dag_heads from database to get latest state
172 // The dag_heads can be updated by delta_store when receiving network deltas,
173 // but the cached Context object won't reflect these changes.
174 // This was causing all deltas to use genesis as parent instead of actual dag_heads.
175 let handle = self.datastore.handle();
176 let key = calimero_store::key::ContextMeta::new(*context_id);
177
178 if let Some(meta) = handle.get(&key)? {
179 let cached = occupied.get_mut();
180
181 // Update dag_heads if they changed in DB
182 if cached.meta.dag_heads != meta.dag_heads {
183 tracing::debug!(
184 %context_id,
185 old_heads_count = cached.meta.dag_heads.len(),
186 new_heads_count = meta.dag_heads.len(),
187 "Refreshing dag_heads from database (cache was stale)"
188 );
189 cached.meta.dag_heads = meta.dag_heads;
190 }
191
192 // Also update root_hash in case it changed
193 cached.meta.root_hash = meta.root_hash.into();
194 }
195
196 Ok(Some(occupied.into_mut()))
197 }
198 btree_map::Entry::Vacant(vacant) => {
199 let Some(context) = self.context_client.get_context(context_id)? else {
200 return Ok(None);
201 };
202
203 let lock = Arc::new(Mutex::new(*context_id));
204
205 let item = vacant.insert(ContextMeta {
206 meta: context,
207 lock,
208 });
209
210 Ok(Some(item))
211 }
212 }
213 }
214}
215
216// objectives:
217// keep up to N items, refresh entries as they are used
218// garbage collect entries as they expire, or as needed
219// share across tasks efficiently, not prolonging locks
220// managed mutation, so guards aren't held for too long
221//
222// result: this should help us share data between clients
223// and their actors,
224//
225// pub struct SharedCache<K, V> {
226// cache: DashMap<Key<K>, V>,
227// index: ArcTimedSizedCache<K, Key<K>>,
228// }
229//
230// struct Key<K>(K);
231// struct Cached<V: Copy>(..);
232// ^- aids read without locking
233// downside: Copy on every write