1use crate::assert::{
2 assert_commit_batch, assert_commit_chunks, assert_commit_chunks_new_asset,
3 assert_commit_chunks_update, assert_create_batch, assert_create_chunk,
4};
5use crate::constants::{ASSET_ENCODING_NO_COMPRESSION, ENCODING_CERTIFICATION_ORDER};
6use crate::errors::JUNO_STORAGE_ERROR_CANNOT_COMMIT_BATCH;
7use crate::runtime::{
8 clear_batch as clear_runtime_batch, clear_expired_batches as clear_expired_runtime_batches,
9 clear_expired_chunks as clear_expired_runtime_chunks, get_batch as get_runtime_batch,
10 get_chunk as get_runtime_chunk, insert_batch as insert_runtime_batch,
11 insert_chunk as insert_runtime_chunk,
12};
13use crate::strategies::{StorageAssertionsStrategy, StorageStateStrategy, StorageUploadStrategy};
14use crate::types::config::StorageConfig;
15use crate::types::interface::{CommitBatch, InitAssetKey, UploadChunk};
16use crate::types::runtime_state::{BatchId, ChunkId};
17use crate::types::store::{
18 Asset, AssetEncoding, AssetKey, Batch, Chunk, EncodingType, ReferenceId,
19};
20use candid::Principal;
21use ic_cdk::api::time;
22use junobuild_collections::types::rules::Rule;
23use junobuild_shared::types::core::Blob;
24use junobuild_shared::types::state::Controllers;
25use std::ptr::addr_of;
26
27const BATCH_EXPIRY_NANOS: u64 = 300_000_000_000;
32
33static mut NEXT_BATCH_ID: BatchId = 0;
34static mut NEXT_CHUNK_ID: ChunkId = 0;
35
36pub fn create_batch(
37 caller: Principal,
38 controllers: &Controllers,
39 config: &StorageConfig,
40 init: InitAssetKey,
41 reference_id: Option<ReferenceId>,
42 storage_state: &impl StorageStateStrategy,
43) -> Result<BatchId, String> {
44 assert_create_batch(caller, controllers, config, &init, storage_state)?;
45
46 get_encoding_type(&init.encoding_type)?;
48
49 Ok(create_batch_impl(caller, init, reference_id))
50}
51
52fn create_batch_impl(
53 caller: Principal,
54 InitAssetKey {
55 token,
56 name,
57 collection,
58 encoding_type,
59 full_path,
60 description,
61 }: InitAssetKey,
62 reference_id: Option<ReferenceId>,
63) -> BatchId {
64 let now = time();
65
66 unsafe {
67 clear_expired_batches();
68
69 NEXT_BATCH_ID += 1;
70
71 let key: AssetKey = AssetKey {
72 full_path,
73 collection,
74 owner: caller,
75 token,
76 name,
77 description,
78 };
79
80 insert_runtime_batch(
81 &*addr_of!(NEXT_BATCH_ID),
82 Batch {
83 key,
84 reference_id,
85 expires_at: now + BATCH_EXPIRY_NANOS,
86 encoding_type,
87 },
88 );
89
90 NEXT_BATCH_ID
91 }
92}
93
94pub fn create_chunk(
95 caller: Principal,
96 config: &StorageConfig,
97 UploadChunk {
98 batch_id,
99 content,
100 order_id,
101 }: UploadChunk,
102) -> Result<ChunkId, String> {
103 let batch = get_runtime_batch(&batch_id);
104
105 match batch {
106 None => Err("Batch not found.".to_string()),
107 Some(b) => {
108 assert_create_chunk(caller, config, &b)?;
109
110 let now = time();
111
112 insert_runtime_batch(
114 &batch_id,
115 Batch {
116 expires_at: now + BATCH_EXPIRY_NANOS,
117 ..b
118 },
119 );
120
121 unsafe {
122 NEXT_CHUNK_ID += 1;
123
124 insert_runtime_chunk(
125 &*addr_of!(NEXT_CHUNK_ID),
126 Chunk {
127 batch_id,
128 content,
129 order_id: order_id.unwrap_or(NEXT_CHUNK_ID),
130 },
131 );
132
133 Ok(NEXT_CHUNK_ID)
134 }
135 }
136 }
137}
138
139pub fn commit_batch(
140 caller: Principal,
141 controllers: &Controllers,
142 config: &StorageConfig,
143 commit_batch: CommitBatch,
144 assertions: &impl StorageAssertionsStrategy,
145 storage_state: &impl StorageStateStrategy,
146 storage_upload: &impl StorageUploadStrategy,
147) -> Result<Asset, String> {
148 let batch = get_runtime_batch(&commit_batch.batch_id);
149
150 match batch {
151 None => Err(JUNO_STORAGE_ERROR_CANNOT_COMMIT_BATCH.to_string()),
152 Some(b) => {
153 let asset = secure_commit_chunks(
154 caller,
155 controllers,
156 config,
157 commit_batch,
158 &b,
159 assertions,
160 storage_state,
161 storage_upload,
162 )?;
163 Ok(asset)
164 }
165 }
166}
167
168#[allow(clippy::too_many_arguments)]
169fn secure_commit_chunks(
170 caller: Principal,
171 controllers: &Controllers,
172 config: &StorageConfig,
173 commit_batch: CommitBatch,
174 batch: &Batch,
175 assertions: &impl StorageAssertionsStrategy,
176 storage_state: &impl StorageStateStrategy,
177 storage_upload: &impl StorageUploadStrategy,
178) -> Result<Asset, String> {
179 let rule = assert_commit_batch(caller, controllers, batch, storage_state)?;
180
181 let current = storage_upload.get_asset(
182 &batch.reference_id,
183 &batch.key.collection,
184 &batch.key.full_path,
185 &rule,
186 )?;
187
188 match current {
189 None => {
190 assert_commit_chunks_new_asset(caller, controllers, config, &rule)?;
191
192 commit_chunks(
193 caller,
194 controllers,
195 commit_batch,
196 batch,
197 &rule,
198 &None,
199 assertions,
200 storage_upload,
201 )
202 }
203 Some(current) => secure_commit_chunks_update(
204 caller,
205 controllers,
206 config,
207 commit_batch,
208 batch,
209 rule,
210 current,
211 assertions,
212 storage_upload,
213 ),
214 }
215}
216
217#[allow(clippy::too_many_arguments)]
218fn secure_commit_chunks_update(
219 caller: Principal,
220 controllers: &Controllers,
221 config: &StorageConfig,
222 commit_batch: CommitBatch,
223 batch: &Batch,
224 rule: Rule,
225 current: Asset,
226 assertions: &impl StorageAssertionsStrategy,
227 storage_upload: &impl StorageUploadStrategy,
228) -> Result<Asset, String> {
229 assert_commit_chunks_update(caller, controllers, config, batch, &rule, ¤t)?;
230
231 commit_chunks(
232 caller,
233 controllers,
234 commit_batch,
235 batch,
236 &rule,
237 &Some(current),
238 assertions,
239 storage_upload,
240 )
241}
242
243#[allow(clippy::too_many_arguments)]
244fn commit_chunks(
245 caller: Principal,
246 controllers: &Controllers,
247 commit_batch: CommitBatch,
248 batch: &Batch,
249 rule: &Rule,
250 current: &Option<Asset>,
251 assertions: &impl StorageAssertionsStrategy,
252 storage_upload: &impl StorageUploadStrategy,
253) -> Result<Asset, String> {
254 let now = time();
255
256 if now > batch.expires_at {
257 clear_expired_batches();
258 return Err("Batch did not complete in time. Chunks cannot be committed.".to_string());
259 }
260
261 assert_commit_chunks(
262 caller,
263 controllers,
264 &commit_batch,
265 batch,
266 current,
267 rule,
268 assertions,
269 )?;
270
271 let CommitBatch {
272 chunk_ids,
273 batch_id,
274 headers,
275 } = commit_batch;
276
277 let mut chunks: Vec<Chunk> = vec![];
279
280 for chunk_id in chunk_ids.iter() {
281 let chunk = get_runtime_chunk(chunk_id);
282
283 match chunk {
284 None => {
285 return Err("Chunk does not exist.".to_string());
286 }
287 Some(c) => {
288 if batch_id != c.batch_id {
289 return Err("Chunk not included in the provided batch.".to_string());
290 }
291
292 chunks.push(c);
293 }
294 }
295 }
296
297 chunks.sort_by(|a, b| a.order_id.cmp(&b.order_id));
299
300 let mut content_chunks: Vec<Blob> = vec![];
301
302 for c in chunks.iter() {
304 content_chunks.push(c.content.clone());
305 }
306
307 if content_chunks.is_empty() {
308 return Err("No chunk to commit.".to_string());
309 }
310
311 let owner = current.as_ref().map_or(caller, |asset| asset.key.owner);
314
315 let key = AssetKey {
316 owner,
317 ..batch.clone().key
318 };
319
320 let mut asset: Asset = Asset::prepare(key, headers, current);
321
322 let encoding_type = get_encoding_type(&batch.encoding_type)?;
323
324 let encoding = AssetEncoding::from(&content_chunks);
325
326 match rule.max_size {
327 None => (),
328 Some(max_size) => {
329 if encoding.total_length > max_size {
330 clear_runtime_batch(&batch_id, &chunk_ids);
331 return Err("Asset exceed max allowed size.".to_string());
332 }
333 }
334 }
335
336 storage_upload.insert_asset_encoding(
337 &batch.clone().key.full_path,
338 &encoding_type,
339 &encoding,
340 &mut asset,
341 rule,
342 );
343
344 storage_upload.insert_asset(batch, &asset, rule)?;
345
346 clear_runtime_batch(&batch_id, &chunk_ids);
347
348 Ok(asset)
349}
350
351fn get_encoding_type(encoding_type: &Option<EncodingType>) -> Result<EncodingType, &'static str> {
352 let provided_type = encoding_type
353 .clone()
354 .unwrap_or_else(|| ASSET_ENCODING_NO_COMPRESSION.to_string());
355
356 let matching_type = Vec::from(ENCODING_CERTIFICATION_ORDER)
357 .iter()
358 .any(|&e| *e == provided_type);
359
360 if !matching_type {
361 return Err("Asset encoding not supported for certification purpose.");
362 }
363
364 Ok(provided_type)
365}
366
367fn clear_expired_batches() {
368 clear_expired_runtime_batches();
370
371 clear_expired_runtime_chunks();
373}