Skip to main content

calimero_context/handlers/
create_context.rs

1use std::collections::{btree_map, BTreeMap};
2use std::mem;
3// Removed: NonZeroUsize (DAG-based approach)
4use std::sync::Arc;
5
6use actix::{ActorResponse, ActorTryFutureExt, Handler, Message, WrapFuture};
7use calimero_context_config::client::config::ClientConfig as ExternalClientConfig;
8use calimero_context_config::client::utils::humanize_iter;
9use calimero_context_primitives::client::ContextClient;
10use calimero_context_primitives::messages::{CreateContextRequest, CreateContextResponse};
11use calimero_node_primitives::client::NodeClient;
12use calimero_primitives::application::{Application, ApplicationId};
13use calimero_primitives::context::{Context, ContextConfigParams, ContextId};
14use calimero_primitives::hash::Hash;
15use calimero_primitives::identity::{PrivateKey, PublicKey};
16use calimero_storage::delta::{CausalDelta, StorageDelta};
17use calimero_store::{key, types, Store};
18use eyre::{bail, OptionExt};
19use rand::rngs::StdRng;
20use rand::SeedableRng;
21use tokio::sync::{Mutex, OwnedMutexGuard};
22use tracing::{debug, warn};
23
24use super::execute::execute;
25use super::execute::storage::{ContextPrivateStorage, ContextStorage};
26use crate::{ContextManager, ContextMeta};
27
28impl Handler<CreateContextRequest> for ContextManager {
29    type Result = ActorResponse<Self, <CreateContextRequest as Message>::Result>;
30
31    fn handle(
32        &mut self,
33        CreateContextRequest {
34            protocol,
35            seed,
36            application_id,
37            identity_secret,
38            init_params,
39        }: CreateContextRequest,
40        _ctx: &mut Self::Context,
41    ) -> Self::Result {
42        let prepared = match Prepared::new(
43            &self.node_client,
44            &self.context_client,
45            &self.external_config,
46            &mut self.contexts,
47            &mut self.applications,
48            protocol,
49            seed,
50            &application_id,
51            identity_secret,
52        ) {
53            Ok(res) => res,
54            Err(err) => return ActorResponse::reply(Err(err)),
55        };
56
57        let Prepared {
58            external_config,
59            application,
60            context,
61            context_secret,
62            identity,
63            identity_secret,
64            sender_key,
65        } = prepared;
66
67        let guard = context
68            .lock
69            .clone()
70            .try_lock_owned()
71            .expect("logically exclusive");
72
73        let context_meta = context.meta.clone();
74
75        let module_task = self.get_module(application_id);
76
77        let context_meta_for_map_ok = context_meta.clone();
78        let context_meta_for_map_err = context_meta.clone();
79
80        ActorResponse::r#async(
81            module_task
82                .and_then(move |module, act, _ctx| {
83                    create_context(
84                        act.datastore.clone(),
85                        act.node_client.clone(),
86                        act.context_client.clone(),
87                        module,
88                        external_config,
89                        context_meta,
90                        context_secret,
91                        application,
92                        identity,
93                        identity_secret,
94                        sender_key,
95                        init_params,
96                        guard,
97                    )
98                    .into_actor(act)
99                })
100                .map_ok(move |root_hash, act, _ctx| {
101                    if let Some(meta) = act.contexts.get_mut(&context_meta_for_map_ok.id) {
102                        // this should almost always exist, but with an LruCache, it
103                        // may not. And if it's been evicted, the next execution will
104                        // re-create it with data from the store, so it's not a problem
105
106                        meta.meta.root_hash = root_hash;
107                    }
108
109                    CreateContextResponse {
110                        context_id: context_meta_for_map_ok.id,
111                        identity,
112                    }
113                })
114                .map_err(move |err, act, _ctx| {
115                    let _ignored = act.contexts.remove(&context_meta_for_map_err.id);
116
117                    err
118                }),
119        )
120    }
121}
122
123struct Prepared<'a> {
124    external_config: ContextConfigParams<'static>,
125    application: Application,
126    context: &'a ContextMeta,
127    context_secret: PrivateKey,
128    identity: PublicKey,
129    identity_secret: PrivateKey,
130    sender_key: PrivateKey,
131}
132
133impl Prepared<'_> {
134    fn new(
135        node_client: &NodeClient,
136        context_client: &ContextClient,
137        external_config: &ExternalClientConfig,
138        contexts: &mut BTreeMap<ContextId, ContextMeta>,
139        applications: &mut BTreeMap<ApplicationId, Application>,
140        protocol: String,
141        seed: Option<[u8; 32]>,
142        application_id: &ApplicationId,
143        identity_secret: Option<PrivateKey>,
144    ) -> eyre::Result<Self> {
145        let Some(external_config) = external_config.params.get(&protocol) else {
146            bail!(
147                "unsupported protocol: {}, expected one of `{}`",
148                protocol,
149                humanize_iter(external_config.params.keys())
150            );
151        };
152
153        let external_config = ContextConfigParams {
154            protocol: protocol.into(),
155            network_id: external_config.network.clone().into(),
156            contract_id: external_config.contract_id.clone().into(),
157            // vv not used for context creation --
158            proxy_contract: "".into(),
159            application_revision: 0,
160            members_revision: 0,
161            // ^^ not used for context creation --
162        };
163
164        let mut rng = rand::thread_rng();
165
166        let sender_key = PrivateKey::random(&mut rng);
167
168        let identity_secret = identity_secret.unwrap_or_else(|| PrivateKey::random(&mut rng));
169
170        let mut context = None;
171        for _ in 0..5 {
172            let context_secret = if let Some(seed) = seed {
173                if context.is_some() {
174                    bail!("seed resulted in an already existing context");
175                }
176
177                PrivateKey::random(&mut StdRng::from_seed(seed))
178            } else {
179                PrivateKey::random(&mut rng)
180            };
181
182            context = Some(None);
183
184            let context_id = ContextId::from(*context_secret.public_key());
185
186            if let btree_map::Entry::Vacant(entry) = contexts.entry(context_id) {
187                if context_client.has_context(&context_id)? {
188                    continue;
189                }
190
191                // safety: the VacantEntry only lives as long as this function
192                //         and the entry within the BTreeMap is constrained to
193                //         the lifetime of the BTreeMap before it is returned
194                let entry = unsafe {
195                    mem::transmute::<_, btree_map::VacantEntry<'static, ContextId, ContextMeta>>(
196                        entry,
197                    )
198                };
199
200                context = Some(Some((entry, context_id, context_secret)));
201
202                break;
203            }
204        }
205        let (entry, context_id, context_secret) = context
206            .flatten()
207            .ok_or_eyre("failed to derive a context id after 5 tries")?;
208
209        let application = match applications.entry(*application_id) {
210            btree_map::Entry::Vacant(vacant) => {
211                let application = node_client
212                    .get_application(application_id)?
213                    .ok_or_eyre("application not found")?;
214
215                vacant.insert(application)
216            }
217            btree_map::Entry::Occupied(occupied) => occupied.into_mut(),
218        };
219
220        let identity = identity_secret.public_key();
221
222        let meta = Context::new(context_id, *application_id, Hash::default());
223
224        let context = entry.insert(ContextMeta {
225            meta,
226            lock: Arc::new(Mutex::new(context_id)),
227        });
228
229        let application = application.clone();
230
231        Ok(Self {
232            external_config,
233            application,
234            context,
235            context_secret,
236            identity,
237            identity_secret,
238            sender_key,
239        })
240    }
241}
242
243async fn create_context(
244    datastore: Store,
245    node_client: NodeClient,
246    context_client: ContextClient,
247    module: calimero_runtime::Module,
248    external_config: ContextConfigParams<'_>,
249    mut context: Context,
250    context_secret: PrivateKey,
251    application: Application,
252    identity: PublicKey,
253    identity_secret: PrivateKey,
254    sender_key: PrivateKey,
255    init_params: Vec<u8>,
256    guard: OwnedMutexGuard<ContextId>,
257) -> eyre::Result<Hash> {
258    let storage = ContextStorage::from(datastore.clone(), context.id);
259    // Create private storage (node-local, NOT synchronized)
260    let private_storage = ContextPrivateStorage::from(datastore, context.id);
261
262    let (outcome, storage, private_storage) = execute(
263        &guard,
264        module,
265        identity,
266        "init".into(),
267        init_params.into(),
268        storage,
269        private_storage,
270        node_client.clone(),
271        // When init runs, the context creation is not yet committed to the database.
272        // The creator's identity is only written to the store after init succeeds.
273        // Therefore, even if a `StoreContextHost` is passed, it would look at the DB and find 0 members.
274        // Passing None accurately reflects that the "Host Node" doesn't technically know about this context's state yet.
275        None,
276    )
277    .await?;
278
279    if let Some(res) = outcome.returns? {
280        bail!(
281            "context initialization returned a value, but it should not: {:?}",
282            res
283        );
284    }
285
286    let init_delta = if let Some(root_hash) = outcome.root_hash {
287        context.root_hash = root_hash.into();
288
289        // CRITICAL: Create delta and set dag_heads for init()
290        // This ensures newly joined nodes can sync via delta protocol
291        let actions = if !outcome.artifact.is_empty() {
292            // Extract actions from init artifact
293            match borsh::from_slice::<StorageDelta>(&outcome.artifact) {
294                Ok(StorageDelta::Actions(actions)) => actions,
295                Ok(_) => {
296                    warn!("Unexpected StorageDelta variant during init");
297                    vec![]
298                }
299                Err(e) => {
300                    warn!(?e, "Failed to deserialize init artifact");
301                    vec![]
302                }
303            }
304        } else {
305            vec![]
306        };
307
308        // Always create a genesis delta. The parent should be `[0; 32]` (genesis).
309        // This way, the DAG will have a head that is associated with a delta even if state is empty.
310        let hlc = calimero_storage::env::hlc_timestamp();
311        // Genesis parent
312        let parents = vec![[0u8; 32]];
313        let delta_id = CausalDelta::compute_id(&parents, &actions, &hlc);
314
315        context.dag_heads = vec![delta_id];
316
317        // Persist the init delta so peers can request it
318        let serialized_actions = borsh::to_vec(&actions)?;
319
320        let delta = types::ContextDagDelta {
321            delta_id,
322            parents,
323            actions: serialized_actions,
324            hlc,
325            applied: true,
326            expected_root_hash: root_hash,
327            // Genesis delta has no events
328            events: None,
329        };
330
331        debug!(
332            context_id = %context.id,
333            delta_id = ?delta_id,
334            actions_count = actions.len(),
335            "Created genesis delta with dag_heads"
336        );
337
338        Some(delta)
339    } else {
340        None
341    };
342
343    let external_client = context_client.external_client(&context.id, &external_config)?;
344
345    let config_client = external_client.config();
346
347    config_client
348        .add_context(&context_secret, &identity, &application)
349        .await?;
350
351    let proxy_contract = config_client.get_proxy_contract().await?;
352
353    let datastore = storage.commit()?;
354    // Commit private storage (node-local, NOT synchronized)
355    let _private_datastore = private_storage.commit()?;
356
357    // Height-based delta tracking removed - now using DAG-based approach
358
359    let mut handle = datastore.handle();
360
361    handle.put(
362        &key::ContextConfig::new(context.id),
363        &types::ContextConfig::new(
364            external_config.protocol.into_owned().into_boxed_str(),
365            external_config.network_id.into_owned().into_boxed_str(),
366            external_config.contract_id.into_owned().into_boxed_str(),
367            proxy_contract.into_boxed_str(),
368            external_config.application_revision,
369            external_config.members_revision,
370        ),
371    )?;
372
373    handle.put(
374        &key::ContextMeta::new(context.id),
375        &types::ContextMeta::new(
376            key::ApplicationMeta::new(application.id),
377            *context.root_hash,
378            context.dag_heads.clone(),
379        ),
380    )?;
381
382    // Persist init delta if created
383    if let Some(delta) = init_delta {
384        handle.put(
385            &key::ContextDagDelta::new(context.id, delta.delta_id),
386            &delta,
387        )?;
388
389        debug!(
390            context_id = %context.id,
391            delta_id = ?delta.delta_id,
392            "Persisted init delta to database"
393        );
394    }
395
396    handle.put(
397        &key::ContextIdentity::new(context.id, identity),
398        &types::ContextIdentity {
399            private_key: Some(*identity_secret),
400            sender_key: Some(*sender_key),
401        },
402    )?;
403
404    node_client.subscribe(&context.id).await?;
405
406    Ok(context.root_hash)
407}