calimero_context/handlers/
join_context.rs1use actix::{Handler, Message, ResponseFuture};
2use calimero_context_primitives::client::crypto::ContextIdentity;
3use calimero_context_primitives::client::ContextClient;
4use calimero_context_primitives::messages::{JoinContextRequest, JoinContextResponse};
5use calimero_node_primitives::client::NodeClient;
6use calimero_primitives::context::{ContextConfigParams, ContextId, ContextInvitationPayload};
7use calimero_primitives::identity::{PrivateKey, PublicKey};
8use eyre::eyre;
9
10use crate::ContextManager;
11
12impl Handler<JoinContextRequest> for ContextManager {
13 type Result = ResponseFuture<<JoinContextRequest as Message>::Result>;
14
15 fn handle(
16 &mut self,
17 JoinContextRequest { invitation_payload }: JoinContextRequest,
18 _ctx: &mut Self::Context,
19 ) -> Self::Result {
20 let node_client = self.node_client.clone();
21 let context_client = self.context_client.clone();
22
23 let task = async move {
24 let (context_id, invitee_id) =
25 join_context(node_client, context_client, invitation_payload).await?;
26
27 Ok(JoinContextResponse {
28 context_id,
29 member_public_key: invitee_id,
30 })
31 };
32
33 Box::pin(task)
34 }
35}
36
37async fn join_context(
38 node_client: NodeClient,
39 context_client: ContextClient,
40 invitation_payload: ContextInvitationPayload,
41) -> eyre::Result<(ContextId, PublicKey)> {
42 let (context_id, invitee_id, protocol, network_id, contract_id) = invitation_payload.parts()?;
43
44 tracing::info!(%context_id, %invitee_id, "join_context: starting join flow");
45
46 let already_joined = context_client
48 .get_identity(&context_id, &invitee_id)?
49 .and_then(|i| i.private_key)
50 .is_some();
51
52 tracing::info!(%context_id, %invitee_id, already_joined, "join_context: checked if already joined");
53
54 if already_joined {
55 let context = context_client.get_context(&context_id)?;
59 let needs_sync = context
60 .map(|ctx| {
61 let empty = ctx.dag_heads.is_empty();
62 tracing::info!(
63 %context_id,
64 %invitee_id,
65 dag_heads_count = ctx.dag_heads.len(),
66 root_hash = %ctx.root_hash,
67 needs_sync = empty,
68 "join_context: identity already exists, checking if sync needed"
69 );
70 empty
71 })
72 .unwrap_or(true); if needs_sync {
75 tracing::info!(%context_id, %invitee_id, "join_context: triggering sync for already-joined context with empty DAG heads");
76 node_client.subscribe(&context_id).await?;
78 node_client.sync(Some(&context_id), None).await?;
79 }
80
81 return Ok((context_id, invitee_id));
82 }
83
84 let stored_identity = context_client
85 .get_identity(&ContextId::zero(), &invitee_id)?
86 .ok_or_else(|| eyre!("missing identity for public key: {}", invitee_id))?;
87
88 let identity_secret = stored_identity
89 .private_key
90 .ok_or_else(|| eyre!("stored identity '{}' is missing private key", invitee_id))?;
91
92 if identity_secret.public_key() != invitee_id {
93 eyre::bail!("identity mismatch")
94 }
95
96 let mut config = None;
97
98 if !context_client.has_context(&context_id)? {
99 let mut external_config = ContextConfigParams {
100 protocol: protocol.into(),
101 network_id: network_id.into(),
102 contract_id: contract_id.into(),
103 proxy_contract: "".into(),
104 application_revision: 0,
105 members_revision: 0,
106 };
107
108 let external_client = context_client.external_client(&context_id, &external_config)?;
109
110 let config_client = external_client.config();
111
112 let proxy_contract = config_client.get_proxy_contract().await?;
113
114 external_config.proxy_contract = proxy_contract.into();
115
116 config = Some(external_config);
117 };
118
119 let _ignored = context_client
120 .sync_context_config(context_id, config)
121 .await?;
122
123 if !context_client.has_member(&context_id, &invitee_id)? {
124 eyre::bail!("unable to join context: not a member, invalid invitation?")
125 }
126
127 let mut rng = rand::thread_rng();
128
129 let sender_key = PrivateKey::random(&mut rng);
130
131 context_client.update_identity(
132 &context_id,
133 &ContextIdentity {
134 public_key: invitee_id,
135 private_key: Some(identity_secret),
136 sender_key: Some(sender_key),
137 },
138 )?;
139
140 context_client.delete_identity(&ContextId::zero(), &invitee_id)?;
143
144 tracing::info!(%context_id, %invitee_id, "join_context: NEW join - calling subscribe and sync");
145 node_client.subscribe(&context_id).await?;
146
147 node_client.sync(Some(&context_id), None).await?;
148 tracing::info!(%context_id, %invitee_id, "join_context: sync request sent successfully");
149
150 Ok((context_id, invitee_id))
151}