Skip to main content

calimero_context/handlers/
join_context.rs

1use actix::{Handler, Message, ResponseFuture};
2use calimero_context_primitives::client::crypto::ContextIdentity;
3use calimero_context_primitives::client::ContextClient;
4use calimero_context_primitives::messages::{JoinContextRequest, JoinContextResponse};
5use calimero_node_primitives::client::NodeClient;
6use calimero_primitives::context::{ContextConfigParams, ContextId, ContextInvitationPayload};
7use calimero_primitives::identity::{PrivateKey, PublicKey};
8use eyre::eyre;
9
10use crate::ContextManager;
11
12impl Handler<JoinContextRequest> for ContextManager {
13    type Result = ResponseFuture<<JoinContextRequest as Message>::Result>;
14
15    fn handle(
16        &mut self,
17        JoinContextRequest { invitation_payload }: JoinContextRequest,
18        _ctx: &mut Self::Context,
19    ) -> Self::Result {
20        let node_client = self.node_client.clone();
21        let context_client = self.context_client.clone();
22
23        let task = async move {
24            let (context_id, invitee_id) =
25                join_context(node_client, context_client, invitation_payload).await?;
26
27            Ok(JoinContextResponse {
28                context_id,
29                member_public_key: invitee_id,
30            })
31        };
32
33        Box::pin(task)
34    }
35}
36
37async fn join_context(
38    node_client: NodeClient,
39    context_client: ContextClient,
40    invitation_payload: ContextInvitationPayload,
41) -> eyre::Result<(ContextId, PublicKey)> {
42    let (context_id, invitee_id, protocol, network_id, contract_id) = invitation_payload.parts()?;
43
44    tracing::info!(%context_id, %invitee_id, "join_context: starting join flow");
45
46    // Check if we already have a fully setup identity for this context
47    let already_joined = context_client
48        .get_identity(&context_id, &invitee_id)?
49        .and_then(|i| i.private_key)
50        .is_some();
51
52    tracing::info!(%context_id, %invitee_id, already_joined, "join_context: checked if already joined");
53
54    if already_joined {
55        // Identity exists, but check if state is initialized
56        // DAG heads being empty means no state has been synced yet
57        // (even if root_hash != [0;32] from external config sync)
58        let context = context_client.get_context(&context_id)?;
59        let needs_sync = context
60            .map(|ctx| {
61                let empty = ctx.dag_heads.is_empty();
62                tracing::info!(
63                    %context_id,
64                    %invitee_id,
65                    dag_heads_count = ctx.dag_heads.len(),
66                    root_hash = %ctx.root_hash,
67                    needs_sync = empty,
68                    "join_context: identity already exists, checking if sync needed"
69                );
70                empty
71            })
72            .unwrap_or(true); // If context doesn't exist, we definitely need sync
73
74        if needs_sync {
75            tracing::info!(%context_id, %invitee_id, "join_context: triggering sync for already-joined context with empty DAG heads");
76            // State is uninitialized - subscribe and trigger sync
77            node_client.subscribe(&context_id).await?;
78            node_client.sync(Some(&context_id), None).await?;
79        }
80
81        return Ok((context_id, invitee_id));
82    }
83
84    let stored_identity = context_client
85        .get_identity(&ContextId::zero(), &invitee_id)?
86        .ok_or_else(|| eyre!("missing identity for public key: {}", invitee_id))?;
87
88    let identity_secret = stored_identity
89        .private_key
90        .ok_or_else(|| eyre!("stored identity '{}' is missing private key", invitee_id))?;
91
92    if identity_secret.public_key() != invitee_id {
93        eyre::bail!("identity mismatch")
94    }
95
96    let mut config = None;
97
98    if !context_client.has_context(&context_id)? {
99        let mut external_config = ContextConfigParams {
100            protocol: protocol.into(),
101            network_id: network_id.into(),
102            contract_id: contract_id.into(),
103            proxy_contract: "".into(),
104            application_revision: 0,
105            members_revision: 0,
106        };
107
108        let external_client = context_client.external_client(&context_id, &external_config)?;
109
110        let config_client = external_client.config();
111
112        let proxy_contract = config_client.get_proxy_contract().await?;
113
114        external_config.proxy_contract = proxy_contract.into();
115
116        config = Some(external_config);
117    };
118
119    let _ignored = context_client
120        .sync_context_config(context_id, config)
121        .await?;
122
123    if !context_client.has_member(&context_id, &invitee_id)? {
124        eyre::bail!("unable to join context: not a member, invalid invitation?")
125    }
126
127    let mut rng = rand::thread_rng();
128
129    let sender_key = PrivateKey::random(&mut rng);
130
131    context_client.update_identity(
132        &context_id,
133        &ContextIdentity {
134            public_key: invitee_id,
135            private_key: Some(identity_secret),
136            sender_key: Some(sender_key),
137        },
138    )?;
139
140    // Delete the identity from the zero context (a.k.a. identity pool),
141    // because we just assigned that identity to the new context.
142    context_client.delete_identity(&ContextId::zero(), &invitee_id)?;
143
144    tracing::info!(%context_id, %invitee_id, "join_context: NEW join - calling subscribe and sync");
145    node_client.subscribe(&context_id).await?;
146
147    node_client.sync(Some(&context_id), None).await?;
148    tracing::info!(%context_id, %invitee_id, "join_context: sync request sent successfully");
149
150    Ok((context_id, invitee_id))
151}