junobuild_storage/
store.rs

1use crate::assert::{
2    assert_commit_batch, assert_commit_chunks, assert_commit_chunks_new_asset,
3    assert_commit_chunks_update, assert_create_batch, assert_create_chunk,
4};
5use crate::constants::{ASSET_ENCODING_NO_COMPRESSION, ENCODING_CERTIFICATION_ORDER};
6use crate::errors::{
7    JUNO_STORAGE_ERROR_ASSET_MAX_ALLOWED_SIZE, JUNO_STORAGE_ERROR_BATCH_NOT_FOUND,
8    JUNO_STORAGE_ERROR_CANNOT_COMMIT_BATCH, JUNO_STORAGE_ERROR_CHUNK_NOT_FOUND,
9    JUNO_STORAGE_ERROR_CHUNK_NOT_INCLUDED_IN_BATCH, JUNO_STORAGE_ERROR_CHUNK_TO_COMMIT_NOT_FOUND,
10};
11use crate::runtime::{
12    clear_batch as clear_runtime_batch, clear_expired_batches as clear_expired_runtime_batches,
13    clear_expired_chunks as clear_expired_runtime_chunks, get_batch as get_runtime_batch,
14    get_chunk as get_runtime_chunk, insert_batch as insert_runtime_batch,
15    insert_chunk as insert_runtime_chunk,
16};
17use crate::strategies::{StorageAssertionsStrategy, StorageStateStrategy, StorageUploadStrategy};
18use crate::types::config::StorageConfig;
19use crate::types::interface::{CommitBatch, InitAssetKey, UploadChunk};
20use crate::types::runtime_state::{BatchId, ChunkId};
21use crate::types::store::{
22    Asset, AssetEncoding, AssetKey, Batch, Chunk, EncodingType, ReferenceId,
23};
24use candid::Principal;
25use ic_cdk::api::time;
26use junobuild_collections::types::rules::Rule;
27use junobuild_shared::types::core::Blob;
28use junobuild_shared::types::state::Controllers;
29use std::ptr::addr_of;
30
31// ---------------------------------------------------------
32// Upload batch and chunks
33// ---------------------------------------------------------
34
35const BATCH_EXPIRY_NANOS: u64 = 300_000_000_000;
36
37static mut NEXT_BATCH_ID: BatchId = 0;
38static mut NEXT_CHUNK_ID: ChunkId = 0;
39
40pub fn create_batch(
41    caller: Principal,
42    controllers: &Controllers,
43    config: &StorageConfig,
44    init: InitAssetKey,
45    reference_id: Option<ReferenceId>,
46    assertions: &impl StorageAssertionsStrategy,
47    storage_state: &impl StorageStateStrategy,
48) -> Result<BatchId, String> {
49    assert_create_batch(
50        caller,
51        controllers,
52        config,
53        &init,
54        assertions,
55        storage_state,
56    )?;
57
58    // Assert supported encoding type
59    get_encoding_type(&init.encoding_type)?;
60
61    Ok(create_batch_impl(caller, init, reference_id))
62}
63
64fn create_batch_impl(
65    caller: Principal,
66    InitAssetKey {
67        token,
68        name,
69        collection,
70        encoding_type,
71        full_path,
72        description,
73    }: InitAssetKey,
74    reference_id: Option<ReferenceId>,
75) -> BatchId {
76    let now = time();
77
78    unsafe {
79        clear_expired_batches();
80
81        NEXT_BATCH_ID += 1;
82
83        let key: AssetKey = AssetKey {
84            full_path,
85            collection,
86            owner: caller,
87            token,
88            name,
89            description,
90        };
91
92        insert_runtime_batch(
93            &*addr_of!(NEXT_BATCH_ID),
94            Batch {
95                key,
96                reference_id,
97                expires_at: now + BATCH_EXPIRY_NANOS,
98                encoding_type,
99            },
100        );
101
102        NEXT_BATCH_ID
103    }
104}
105
106pub fn create_chunk(
107    caller: Principal,
108    config: &StorageConfig,
109    UploadChunk {
110        batch_id,
111        content,
112        order_id,
113    }: UploadChunk,
114) -> Result<ChunkId, String> {
115    let batch = get_runtime_batch(&batch_id);
116
117    match batch {
118        None => Err(JUNO_STORAGE_ERROR_BATCH_NOT_FOUND.to_string()),
119        Some(b) => {
120            assert_create_chunk(caller, config, &b)?;
121
122            let now = time();
123
124            // Update batch to extend expires_at
125            insert_runtime_batch(
126                &batch_id,
127                Batch {
128                    expires_at: now + BATCH_EXPIRY_NANOS,
129                    ..b
130                },
131            );
132
133            unsafe {
134                NEXT_CHUNK_ID += 1;
135
136                insert_runtime_chunk(
137                    &*addr_of!(NEXT_CHUNK_ID),
138                    Chunk {
139                        batch_id,
140                        content,
141                        order_id: order_id.unwrap_or(NEXT_CHUNK_ID),
142                    },
143                );
144
145                Ok(NEXT_CHUNK_ID)
146            }
147        }
148    }
149}
150
151pub fn commit_batch(
152    caller: Principal,
153    controllers: &Controllers,
154    config: &StorageConfig,
155    commit_batch: CommitBatch,
156    assertions: &impl StorageAssertionsStrategy,
157    storage_state: &impl StorageStateStrategy,
158    storage_upload: &impl StorageUploadStrategy,
159) -> Result<Asset, String> {
160    let batch = get_runtime_batch(&commit_batch.batch_id);
161
162    match batch {
163        None => Err(JUNO_STORAGE_ERROR_CANNOT_COMMIT_BATCH.to_string()),
164        Some(b) => {
165            let asset = secure_commit_chunks(
166                caller,
167                controllers,
168                config,
169                commit_batch,
170                &b,
171                assertions,
172                storage_state,
173                storage_upload,
174            )?;
175            Ok(asset)
176        }
177    }
178}
179
180#[allow(clippy::too_many_arguments)]
181fn secure_commit_chunks(
182    caller: Principal,
183    controllers: &Controllers,
184    config: &StorageConfig,
185    commit_batch: CommitBatch,
186    batch: &Batch,
187    assertions: &impl StorageAssertionsStrategy,
188    storage_state: &impl StorageStateStrategy,
189    storage_upload: &impl StorageUploadStrategy,
190) -> Result<Asset, String> {
191    let rule = assert_commit_batch(caller, controllers, batch, assertions, storage_state)?;
192
193    let current = storage_upload.get_asset(
194        &batch.reference_id,
195        &batch.key.collection,
196        &batch.key.full_path,
197        &rule,
198    )?;
199
200    match current {
201        None => {
202            assert_commit_chunks_new_asset(
203                caller,
204                &batch.key.collection,
205                controllers,
206                config,
207                &rule,
208                assertions,
209            )?;
210
211            commit_chunks(
212                caller,
213                controllers,
214                commit_batch,
215                batch,
216                &rule,
217                &None,
218                assertions,
219                storage_upload,
220            )
221        }
222        Some(current) => secure_commit_chunks_update(
223            caller,
224            controllers,
225            config,
226            commit_batch,
227            batch,
228            rule,
229            current,
230            assertions,
231            storage_upload,
232        ),
233    }
234}
235
236#[allow(clippy::too_many_arguments)]
237fn secure_commit_chunks_update(
238    caller: Principal,
239    controllers: &Controllers,
240    config: &StorageConfig,
241    commit_batch: CommitBatch,
242    batch: &Batch,
243    rule: Rule,
244    current: Asset,
245    assertions: &impl StorageAssertionsStrategy,
246    storage_upload: &impl StorageUploadStrategy,
247) -> Result<Asset, String> {
248    assert_commit_chunks_update(
249        caller,
250        controllers,
251        config,
252        batch,
253        &rule,
254        &current,
255        assertions,
256    )?;
257
258    commit_chunks(
259        caller,
260        controllers,
261        commit_batch,
262        batch,
263        &rule,
264        &Some(current),
265        assertions,
266        storage_upload,
267    )
268}
269
270#[allow(clippy::too_many_arguments)]
271fn commit_chunks(
272    caller: Principal,
273    controllers: &Controllers,
274    commit_batch: CommitBatch,
275    batch: &Batch,
276    rule: &Rule,
277    current: &Option<Asset>,
278    assertions: &impl StorageAssertionsStrategy,
279    storage_upload: &impl StorageUploadStrategy,
280) -> Result<Asset, String> {
281    let now = time();
282
283    if now > batch.expires_at {
284        clear_expired_batches();
285        return Err("Batch did not complete in time. Chunks cannot be committed.".to_string());
286    }
287
288    assert_commit_chunks(
289        caller,
290        controllers,
291        &commit_batch,
292        batch,
293        current,
294        rule,
295        assertions,
296    )?;
297
298    let CommitBatch {
299        chunk_ids,
300        batch_id,
301        headers,
302    } = commit_batch;
303
304    // Collect all chunks
305    let mut chunks: Vec<Chunk> = vec![];
306
307    for chunk_id in chunk_ids.iter() {
308        let chunk = get_runtime_chunk(chunk_id);
309
310        match chunk {
311            None => {
312                return Err(JUNO_STORAGE_ERROR_CHUNK_NOT_FOUND.to_string());
313            }
314            Some(c) => {
315                if batch_id != c.batch_id {
316                    return Err(JUNO_STORAGE_ERROR_CHUNK_NOT_INCLUDED_IN_BATCH.to_string());
317                }
318
319                chunks.push(c);
320            }
321        }
322    }
323
324    // Sort with ordering
325    chunks.sort_by(|a, b| a.order_id.cmp(&b.order_id));
326
327    let mut content_chunks: Vec<Blob> = vec![];
328
329    // Collect content
330    for c in chunks.iter() {
331        content_chunks.push(c.content.clone());
332    }
333
334    if content_chunks.is_empty() {
335        return Err(JUNO_STORAGE_ERROR_CHUNK_TO_COMMIT_NOT_FOUND.to_string());
336    }
337
338    // We clone the key with the new information provided by the upload (name, full_path, token, etc.) to set the new key.
339    // However, the owner remains the one who originally created the asset.
340    let owner = current.as_ref().map_or(caller, |asset| asset.key.owner);
341
342    let key = AssetKey {
343        owner,
344        ..batch.clone().key
345    };
346
347    let mut asset: Asset = Asset::prepare(key, headers, current);
348
349    let encoding_type = get_encoding_type(&batch.encoding_type)?;
350
351    let encoding = AssetEncoding::from(&content_chunks);
352
353    match rule.max_size {
354        None => (),
355        Some(max_size) => {
356            if encoding.total_length > max_size {
357                clear_runtime_batch(&batch_id, &chunk_ids);
358                return Err(JUNO_STORAGE_ERROR_ASSET_MAX_ALLOWED_SIZE.to_string());
359            }
360        }
361    }
362
363    storage_upload.insert_asset_encoding(
364        &batch.reference_id,
365        &batch.key.full_path,
366        &encoding_type,
367        &encoding,
368        &mut asset,
369        rule,
370    )?;
371
372    storage_upload.insert_asset(batch, &asset, rule)?;
373
374    clear_runtime_batch(&batch_id, &chunk_ids);
375
376    Ok(asset)
377}
378
379fn get_encoding_type(encoding_type: &Option<EncodingType>) -> Result<EncodingType, &'static str> {
380    let provided_type = encoding_type
381        .clone()
382        .unwrap_or_else(|| ASSET_ENCODING_NO_COMPRESSION.to_string());
383
384    let matching_type = Vec::from(ENCODING_CERTIFICATION_ORDER)
385        .iter()
386        .any(|&e| *e == provided_type);
387
388    if !matching_type {
389        return Err("Asset encoding not supported for certification purpose.");
390    }
391
392    Ok(provided_type)
393}
394
395fn clear_expired_batches() {
396    // Remove expired batches
397    clear_expired_runtime_batches();
398
399    // Remove chunk without existing batches (those we just deleted above)
400    clear_expired_runtime_chunks();
401}