1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
//! Blob / application fetching for [`SyncManager`]: resolving a context's
//! blob id + application config, querying application size/source, and
//! installing a bundle after blob sharing. Extracted from the manager
//! god-file as an `impl SyncManager` fragment.
use calimero_node_primitives::client::NodeClient;
use calimero_primitives::application::ApplicationId;
use calimero_primitives::context::ContextId;
use calimero_primitives::events::{
AppVersionChangedPayload, ContextEvent, ContextEventPayload, NodeEvent,
};
use eyre::bail;
use tracing::{debug, warn};
use super::SyncManager;
impl SyncManager {
/// Get blob ID and application config from application or context config
pub(super) async fn get_blob_info(
&self,
context_id: &ContextId,
application: &Option<calimero_primitives::application::Application>,
) -> eyre::Result<(
calimero_primitives::blobs::BlobId,
Option<calimero_primitives::application::Application>,
)> {
if let Some(ref app) = application {
Ok((app.blob.bytecode, None))
} else {
// Application not found - get blob_id from context config
let app_config = self
.context_client
.get_context_application(context_id)
.await?;
Ok((app_config.blob.bytecode, Some(app_config)))
}
}
/// Get application size from application, cached config, or context config
pub(super) async fn get_application_size(
&self,
context_id: &ContextId,
application: &Option<calimero_primitives::application::Application>,
app_config_opt: &Option<calimero_primitives::application::Application>,
) -> eyre::Result<u64> {
if let Some(ref app) = application {
Ok(app.size)
} else if let Some(ref app_config) = app_config_opt {
Ok(app_config.size)
} else {
let app_config = self
.context_client
.get_context_application(context_id)
.await?;
Ok(app_config.size)
}
}
/// Get application source from cached config or context config
async fn get_application_source(
&self,
context_id: &ContextId,
app_config_opt: &Option<calimero_primitives::application::Application>,
) -> eyre::Result<calimero_primitives::application::ApplicationSource> {
if let Some(ref app_config) = app_config_opt {
Ok(app_config.source.clone())
} else {
let app_config = self
.context_client
.get_context_application(context_id)
.await?;
Ok(app_config.source.clone())
}
}
/// Install bundle application after blob sharing completes.
///
/// Returns `Some(installed_application)` if a bundle was installed,
/// `None` otherwise. Updates `context.application_id` if the installed
/// ApplicationId differs from the context's ApplicationId.
pub(super) async fn install_bundle_after_blob_sharing(
&self,
context_id: &ContextId,
blob_id: &calimero_primitives::blobs::BlobId,
app_config_opt: &Option<calimero_primitives::application::Application>,
context: &mut calimero_primitives::context::Context,
application: &mut Option<calimero_primitives::application::Application>,
) -> eyre::Result<()> {
// Only proceed if blob is now available locally
if !self.node_client.has_blob(blob_id)? {
return Ok(());
}
// Check if blob is a bundle
let Some(blob_bytes) = self.node_client.get_blob_bytes(blob_id, None).await? else {
return Ok(());
};
// Wrap blocking I/O in spawn_blocking to avoid blocking async runtime
let blob_bytes_clone = blob_bytes.clone();
let is_bundle =
tokio::task::spawn_blocking(move || NodeClient::is_bundle_blob(&blob_bytes_clone))
.await?;
// Get source from context config (use cached if available, otherwise fetch)
let source = self
.get_application_source(context_id, app_config_opt)
.await?;
let installed_app_id = if is_bundle {
self.node_client
.install_application_from_bundle_blob(blob_id, &source)
.await
.map_err(|e| {
eyre::eyre!(
"Failed to install bundle application from blob {}: {}",
blob_id,
e
)
})?
} else {
// For non-bundle apps, write ApplicationMeta directly under the
// known application_id rather than re-deriving it via
// install_application (which hashes source+metadata and would
// produce a different ID than the original installer used).
let size = blob_bytes.len() as u64;
let mut handle = self.context_client.datastore_handle();
handle.put(
&calimero_store::key::ApplicationMeta::new(context.application_id),
&calimero_store::types::ApplicationMeta::new(
calimero_store::key::BlobMeta::new(*blob_id),
size,
source.to_string().into_boxed_str(),
Box::default(),
calimero_store::key::BlobMeta::new(calimero_primitives::blobs::BlobId::from(
[0u8; 32],
)),
"unknown".to_owned().into_boxed_str(),
"0.0.0".to_owned().into_boxed_str(),
String::new().into_boxed_str(),
),
)?;
context.application_id
};
// Verify installation succeeded by fetching the installed application
let installed_application = self
.node_client
.get_application(&installed_app_id)
.map_err(|e| {
eyre::eyre!(
"Failed to verify bundle installation for application {}: {}",
installed_app_id,
e
)
})?;
let Some(installed_application) = installed_application else {
bail!(
"Bundle installation reported success but application {} is not retrievable",
installed_app_id
);
};
// Check if the installed ApplicationId matches the context's ApplicationId
if installed_app_id != context.application_id {
warn!(
installed_app_id = %installed_app_id,
context_app_id = %context.application_id,
"Installed application ID does not match context application ID, updating to installed ID"
);
// Capture the pre-flip id for the AppVersionChanged emit below; this
// is a durable application flip (this node just learned, via blob
// sync, that its context's app changed), so it must notify
// subscribers like the update_application workers do.
let old_app_id = context.application_id;
// Update context with the installed application ID for consistency
context.application_id = installed_app_id;
// Persist the ApplicationId change to the database
// This is critical: if we don't persist, the old ApplicationId will be
// used on node restart, causing application lookup failures
self.context_client
.update_context_application_id(context_id, installed_app_id)
.map_err(|e| {
eyre::eyre!(
"Failed to persist ApplicationId update for context {}: {}",
context_id,
e
)
})?;
debug!(
%context_id,
installed_app_id = %installed_app_id,
"Persisted ApplicationId update to database"
);
// Notify subscribers of the version flip (skew #2). Best-effort, like
// the update_application emit. The guard above is the dedup (only a
// genuine id change reaches here). to_version comes straight off the
// installed Application; from_version resolves the old app row.
let event = NodeEvent::Context(ContextEvent {
context_id: *context_id,
payload: ContextEventPayload::AppVersionChanged(AppVersionChangedPayload {
from_version: self.application_version(old_app_id),
to_version: Some(installed_application.version.clone())
.filter(|v| !v.is_empty()),
}),
});
let _ = self.node_client.send_event(event);
}
// Use the verified installed application
*application = Some(installed_application);
Ok(())
}
/// Resolves an application's semver from its `ApplicationMeta` row via the
/// context store; `None` when the row is absent. Labels the from-version of
/// the blob-sync `AppVersionChanged` emit (mirrors the context-handler
/// `application_version` helper).
fn application_version(&self, application_id: ApplicationId) -> Option<String> {
self.context_client
.datastore_handle()
.get(&calimero_store::key::ApplicationMeta::new(application_id))
.ok()
.flatten()
.map(|meta| meta.version.to_string())
}
}