1use crate::assert::{
2 assert_commit_batch, assert_commit_chunks, assert_commit_chunks_new_asset,
3 assert_commit_chunks_update, assert_create_batch, assert_create_chunk,
4};
5use crate::constants::{ASSET_ENCODING_NO_COMPRESSION, ENCODING_CERTIFICATION_ORDER};
6use crate::errors::{
7 JUNO_STORAGE_ERROR_ASSET_MAX_ALLOWED_SIZE, JUNO_STORAGE_ERROR_BATCH_NOT_FOUND,
8 JUNO_STORAGE_ERROR_CANNOT_COMMIT_BATCH, JUNO_STORAGE_ERROR_CHUNK_NOT_FOUND,
9 JUNO_STORAGE_ERROR_CHUNK_NOT_INCLUDED_IN_BATCH, JUNO_STORAGE_ERROR_CHUNK_TO_COMMIT_NOT_FOUND,
10};
11use crate::runtime::{
12 clear_batch as clear_runtime_batch, clear_expired_batches as clear_expired_runtime_batches,
13 clear_expired_chunks as clear_expired_runtime_chunks, get_batch as get_runtime_batch,
14 get_chunk as get_runtime_chunk, insert_batch as insert_runtime_batch,
15 insert_chunk as insert_runtime_chunk,
16};
17use crate::strategies::{StorageAssertionsStrategy, StorageStateStrategy, StorageUploadStrategy};
18use crate::types::config::StorageConfig;
19use crate::types::interface::{CommitBatch, InitAssetKey, UploadChunk};
20use crate::types::runtime_state::{BatchId, ChunkId};
21use crate::types::store::{
22 Asset, AssetEncoding, AssetKey, Batch, Chunk, EncodingType, ReferenceId,
23};
24use candid::Principal;
25use ic_cdk::api::time;
26use junobuild_collections::types::rules::Rule;
27use junobuild_shared::types::core::Blob;
28use junobuild_shared::types::state::Controllers;
29use std::ptr::addr_of;
30
31const BATCH_EXPIRY_NANOS: u64 = 300_000_000_000;
36
37static mut NEXT_BATCH_ID: BatchId = 0;
38static mut NEXT_CHUNK_ID: ChunkId = 0;
39
40pub fn create_batch(
41 caller: Principal,
42 controllers: &Controllers,
43 config: &StorageConfig,
44 init: InitAssetKey,
45 reference_id: Option<ReferenceId>,
46 assertions: &impl StorageAssertionsStrategy,
47 storage_state: &impl StorageStateStrategy,
48) -> Result<BatchId, String> {
49 assert_create_batch(
50 caller,
51 controllers,
52 config,
53 &init,
54 assertions,
55 storage_state,
56 )?;
57
58 get_encoding_type(&init.encoding_type)?;
60
61 Ok(create_batch_impl(caller, init, reference_id))
62}
63
64fn create_batch_impl(
65 caller: Principal,
66 InitAssetKey {
67 token,
68 name,
69 collection,
70 encoding_type,
71 full_path,
72 description,
73 }: InitAssetKey,
74 reference_id: Option<ReferenceId>,
75) -> BatchId {
76 let now = time();
77
78 unsafe {
79 clear_expired_batches();
80
81 NEXT_BATCH_ID += 1;
82
83 let key: AssetKey = AssetKey {
84 full_path,
85 collection,
86 owner: caller,
87 token,
88 name,
89 description,
90 };
91
92 insert_runtime_batch(
93 &*addr_of!(NEXT_BATCH_ID),
94 Batch {
95 key,
96 reference_id,
97 expires_at: now + BATCH_EXPIRY_NANOS,
98 encoding_type,
99 },
100 );
101
102 NEXT_BATCH_ID
103 }
104}
105
106pub fn create_chunk(
107 caller: Principal,
108 config: &StorageConfig,
109 UploadChunk {
110 batch_id,
111 content,
112 order_id,
113 }: UploadChunk,
114) -> Result<ChunkId, String> {
115 let batch = get_runtime_batch(&batch_id);
116
117 match batch {
118 None => Err(JUNO_STORAGE_ERROR_BATCH_NOT_FOUND.to_string()),
119 Some(b) => {
120 assert_create_chunk(caller, config, &b)?;
121
122 let now = time();
123
124 insert_runtime_batch(
126 &batch_id,
127 Batch {
128 expires_at: now + BATCH_EXPIRY_NANOS,
129 ..b
130 },
131 );
132
133 unsafe {
134 NEXT_CHUNK_ID += 1;
135
136 insert_runtime_chunk(
137 &*addr_of!(NEXT_CHUNK_ID),
138 Chunk {
139 batch_id,
140 content,
141 order_id: order_id.unwrap_or(NEXT_CHUNK_ID),
142 },
143 );
144
145 Ok(NEXT_CHUNK_ID)
146 }
147 }
148 }
149}
150
151pub fn commit_batch(
152 caller: Principal,
153 controllers: &Controllers,
154 config: &StorageConfig,
155 commit_batch: CommitBatch,
156 assertions: &impl StorageAssertionsStrategy,
157 storage_state: &impl StorageStateStrategy,
158 storage_upload: &impl StorageUploadStrategy,
159) -> Result<Asset, String> {
160 let batch = get_runtime_batch(&commit_batch.batch_id);
161
162 match batch {
163 None => Err(JUNO_STORAGE_ERROR_CANNOT_COMMIT_BATCH.to_string()),
164 Some(b) => {
165 let asset = secure_commit_chunks(
166 caller,
167 controllers,
168 config,
169 commit_batch,
170 &b,
171 assertions,
172 storage_state,
173 storage_upload,
174 )?;
175 Ok(asset)
176 }
177 }
178}
179
180#[allow(clippy::too_many_arguments)]
181fn secure_commit_chunks(
182 caller: Principal,
183 controllers: &Controllers,
184 config: &StorageConfig,
185 commit_batch: CommitBatch,
186 batch: &Batch,
187 assertions: &impl StorageAssertionsStrategy,
188 storage_state: &impl StorageStateStrategy,
189 storage_upload: &impl StorageUploadStrategy,
190) -> Result<Asset, String> {
191 let rule = assert_commit_batch(caller, controllers, batch, assertions, storage_state)?;
192
193 let current = storage_upload.get_asset(
194 &batch.reference_id,
195 &batch.key.collection,
196 &batch.key.full_path,
197 &rule,
198 )?;
199
200 match current {
201 None => {
202 assert_commit_chunks_new_asset(
203 caller,
204 &batch.key.collection,
205 controllers,
206 config,
207 &rule,
208 assertions,
209 )?;
210
211 commit_chunks(
212 caller,
213 controllers,
214 commit_batch,
215 batch,
216 &rule,
217 &None,
218 assertions,
219 storage_upload,
220 )
221 }
222 Some(current) => secure_commit_chunks_update(
223 caller,
224 controllers,
225 config,
226 commit_batch,
227 batch,
228 rule,
229 current,
230 assertions,
231 storage_upload,
232 ),
233 }
234}
235
236#[allow(clippy::too_many_arguments)]
237fn secure_commit_chunks_update(
238 caller: Principal,
239 controllers: &Controllers,
240 config: &StorageConfig,
241 commit_batch: CommitBatch,
242 batch: &Batch,
243 rule: Rule,
244 current: Asset,
245 assertions: &impl StorageAssertionsStrategy,
246 storage_upload: &impl StorageUploadStrategy,
247) -> Result<Asset, String> {
248 assert_commit_chunks_update(
249 caller,
250 controllers,
251 config,
252 batch,
253 &rule,
254 ¤t,
255 assertions,
256 )?;
257
258 commit_chunks(
259 caller,
260 controllers,
261 commit_batch,
262 batch,
263 &rule,
264 &Some(current),
265 assertions,
266 storage_upload,
267 )
268}
269
270#[allow(clippy::too_many_arguments)]
271fn commit_chunks(
272 caller: Principal,
273 controllers: &Controllers,
274 commit_batch: CommitBatch,
275 batch: &Batch,
276 rule: &Rule,
277 current: &Option<Asset>,
278 assertions: &impl StorageAssertionsStrategy,
279 storage_upload: &impl StorageUploadStrategy,
280) -> Result<Asset, String> {
281 let now = time();
282
283 if now > batch.expires_at {
284 clear_expired_batches();
285 return Err("Batch did not complete in time. Chunks cannot be committed.".to_string());
286 }
287
288 assert_commit_chunks(
289 caller,
290 controllers,
291 &commit_batch,
292 batch,
293 current,
294 rule,
295 assertions,
296 )?;
297
298 let CommitBatch {
299 chunk_ids,
300 batch_id,
301 headers,
302 } = commit_batch;
303
304 let mut chunks: Vec<Chunk> = vec![];
306
307 for chunk_id in chunk_ids.iter() {
308 let chunk = get_runtime_chunk(chunk_id);
309
310 match chunk {
311 None => {
312 return Err(JUNO_STORAGE_ERROR_CHUNK_NOT_FOUND.to_string());
313 }
314 Some(c) => {
315 if batch_id != c.batch_id {
316 return Err(JUNO_STORAGE_ERROR_CHUNK_NOT_INCLUDED_IN_BATCH.to_string());
317 }
318
319 chunks.push(c);
320 }
321 }
322 }
323
324 chunks.sort_by(|a, b| a.order_id.cmp(&b.order_id));
326
327 let mut content_chunks: Vec<Blob> = vec![];
328
329 for c in chunks.iter() {
331 content_chunks.push(c.content.clone());
332 }
333
334 if content_chunks.is_empty() {
335 return Err(JUNO_STORAGE_ERROR_CHUNK_TO_COMMIT_NOT_FOUND.to_string());
336 }
337
338 let owner = current.as_ref().map_or(caller, |asset| asset.key.owner);
341
342 let key = AssetKey {
343 owner,
344 ..batch.clone().key
345 };
346
347 let mut asset: Asset = Asset::prepare(key, headers, current);
348
349 let encoding_type = get_encoding_type(&batch.encoding_type)?;
350
351 let encoding = AssetEncoding::from(&content_chunks);
352
353 match rule.max_size {
354 None => (),
355 Some(max_size) => {
356 if encoding.total_length > max_size {
357 clear_runtime_batch(&batch_id, &chunk_ids);
358 return Err(JUNO_STORAGE_ERROR_ASSET_MAX_ALLOWED_SIZE.to_string());
359 }
360 }
361 }
362
363 storage_upload.insert_asset_encoding(
364 &batch.reference_id,
365 &batch.key.full_path,
366 &encoding_type,
367 &encoding,
368 &mut asset,
369 rule,
370 )?;
371
372 storage_upload.insert_asset(batch, &asset, rule)?;
373
374 clear_runtime_batch(&batch_id, &chunk_ids);
375
376 Ok(asset)
377}
378
379fn get_encoding_type(encoding_type: &Option<EncodingType>) -> Result<EncodingType, &'static str> {
380 let provided_type = encoding_type
381 .clone()
382 .unwrap_or_else(|| ASSET_ENCODING_NO_COMPRESSION.to_string());
383
384 let matching_type = Vec::from(ENCODING_CERTIFICATION_ORDER)
385 .iter()
386 .any(|&e| *e == provided_type);
387
388 if !matching_type {
389 return Err("Asset encoding not supported for certification purpose.");
390 }
391
392 Ok(provided_type)
393}
394
395fn clear_expired_batches() {
396 clear_expired_runtime_batches();
398
399 clear_expired_runtime_chunks();
401}