junobuild_storage/
store.rs

1use crate::assert::{
2    assert_commit_batch, assert_commit_chunks, assert_commit_chunks_new_asset,
3    assert_commit_chunks_update, assert_create_batch, assert_create_chunk,
4};
5use crate::constants::{ASSET_ENCODING_NO_COMPRESSION, ENCODING_CERTIFICATION_ORDER};
6use crate::errors::JUNO_STORAGE_ERROR_CANNOT_COMMIT_BATCH;
7use crate::runtime::{
8    clear_batch as clear_runtime_batch, clear_expired_batches as clear_expired_runtime_batches,
9    clear_expired_chunks as clear_expired_runtime_chunks, get_batch as get_runtime_batch,
10    get_chunk as get_runtime_chunk, insert_batch as insert_runtime_batch,
11    insert_chunk as insert_runtime_chunk,
12};
13use crate::strategies::{StorageAssertionsStrategy, StorageStateStrategy, StorageUploadStrategy};
14use crate::types::config::StorageConfig;
15use crate::types::interface::{CommitBatch, InitAssetKey, UploadChunk};
16use crate::types::runtime_state::{BatchId, ChunkId};
17use crate::types::store::{
18    Asset, AssetEncoding, AssetKey, Batch, Chunk, EncodingType, ReferenceId,
19};
20use candid::Principal;
21use ic_cdk::api::time;
22use junobuild_collections::types::rules::Rule;
23use junobuild_shared::types::core::Blob;
24use junobuild_shared::types::state::Controllers;
25use std::ptr::addr_of;
26
27// ---------------------------------------------------------
28// Upload batch and chunks
29// ---------------------------------------------------------
30
31const BATCH_EXPIRY_NANOS: u64 = 300_000_000_000;
32
33static mut NEXT_BATCH_ID: BatchId = 0;
34static mut NEXT_CHUNK_ID: ChunkId = 0;
35
36pub fn create_batch(
37    caller: Principal,
38    controllers: &Controllers,
39    config: &StorageConfig,
40    init: InitAssetKey,
41    reference_id: Option<ReferenceId>,
42    storage_state: &impl StorageStateStrategy,
43) -> Result<BatchId, String> {
44    assert_create_batch(caller, controllers, config, &init, storage_state)?;
45
46    // Assert supported encoding type
47    get_encoding_type(&init.encoding_type)?;
48
49    Ok(create_batch_impl(caller, init, reference_id))
50}
51
52fn create_batch_impl(
53    caller: Principal,
54    InitAssetKey {
55        token,
56        name,
57        collection,
58        encoding_type,
59        full_path,
60        description,
61    }: InitAssetKey,
62    reference_id: Option<ReferenceId>,
63) -> BatchId {
64    let now = time();
65
66    unsafe {
67        clear_expired_batches();
68
69        NEXT_BATCH_ID += 1;
70
71        let key: AssetKey = AssetKey {
72            full_path,
73            collection,
74            owner: caller,
75            token,
76            name,
77            description,
78        };
79
80        insert_runtime_batch(
81            &*addr_of!(NEXT_BATCH_ID),
82            Batch {
83                key,
84                reference_id,
85                expires_at: now + BATCH_EXPIRY_NANOS,
86                encoding_type,
87            },
88        );
89
90        NEXT_BATCH_ID
91    }
92}
93
94pub fn create_chunk(
95    caller: Principal,
96    config: &StorageConfig,
97    UploadChunk {
98        batch_id,
99        content,
100        order_id,
101    }: UploadChunk,
102) -> Result<ChunkId, String> {
103    let batch = get_runtime_batch(&batch_id);
104
105    match batch {
106        None => Err("Batch not found.".to_string()),
107        Some(b) => {
108            assert_create_chunk(caller, config, &b)?;
109
110            let now = time();
111
112            // Update batch to extend expires_at
113            insert_runtime_batch(
114                &batch_id,
115                Batch {
116                    expires_at: now + BATCH_EXPIRY_NANOS,
117                    ..b
118                },
119            );
120
121            unsafe {
122                NEXT_CHUNK_ID += 1;
123
124                insert_runtime_chunk(
125                    &*addr_of!(NEXT_CHUNK_ID),
126                    Chunk {
127                        batch_id,
128                        content,
129                        order_id: order_id.unwrap_or(NEXT_CHUNK_ID),
130                    },
131                );
132
133                Ok(NEXT_CHUNK_ID)
134            }
135        }
136    }
137}
138
139pub fn commit_batch(
140    caller: Principal,
141    controllers: &Controllers,
142    config: &StorageConfig,
143    commit_batch: CommitBatch,
144    assertions: &impl StorageAssertionsStrategy,
145    storage_state: &impl StorageStateStrategy,
146    storage_upload: &impl StorageUploadStrategy,
147) -> Result<Asset, String> {
148    let batch = get_runtime_batch(&commit_batch.batch_id);
149
150    match batch {
151        None => Err(JUNO_STORAGE_ERROR_CANNOT_COMMIT_BATCH.to_string()),
152        Some(b) => {
153            let asset = secure_commit_chunks(
154                caller,
155                controllers,
156                config,
157                commit_batch,
158                &b,
159                assertions,
160                storage_state,
161                storage_upload,
162            )?;
163            Ok(asset)
164        }
165    }
166}
167
168#[allow(clippy::too_many_arguments)]
169fn secure_commit_chunks(
170    caller: Principal,
171    controllers: &Controllers,
172    config: &StorageConfig,
173    commit_batch: CommitBatch,
174    batch: &Batch,
175    assertions: &impl StorageAssertionsStrategy,
176    storage_state: &impl StorageStateStrategy,
177    storage_upload: &impl StorageUploadStrategy,
178) -> Result<Asset, String> {
179    let rule = assert_commit_batch(caller, controllers, batch, storage_state)?;
180
181    let current = storage_upload.get_asset(
182        &batch.reference_id,
183        &batch.key.collection,
184        &batch.key.full_path,
185        &rule,
186    )?;
187
188    match current {
189        None => {
190            assert_commit_chunks_new_asset(caller, controllers, config, &rule)?;
191
192            commit_chunks(
193                caller,
194                controllers,
195                commit_batch,
196                batch,
197                &rule,
198                &None,
199                assertions,
200                storage_upload,
201            )
202        }
203        Some(current) => secure_commit_chunks_update(
204            caller,
205            controllers,
206            config,
207            commit_batch,
208            batch,
209            rule,
210            current,
211            assertions,
212            storage_upload,
213        ),
214    }
215}
216
217#[allow(clippy::too_many_arguments)]
218fn secure_commit_chunks_update(
219    caller: Principal,
220    controllers: &Controllers,
221    config: &StorageConfig,
222    commit_batch: CommitBatch,
223    batch: &Batch,
224    rule: Rule,
225    current: Asset,
226    assertions: &impl StorageAssertionsStrategy,
227    storage_upload: &impl StorageUploadStrategy,
228) -> Result<Asset, String> {
229    assert_commit_chunks_update(caller, controllers, config, batch, &rule, &current)?;
230
231    commit_chunks(
232        caller,
233        controllers,
234        commit_batch,
235        batch,
236        &rule,
237        &Some(current),
238        assertions,
239        storage_upload,
240    )
241}
242
243#[allow(clippy::too_many_arguments)]
244fn commit_chunks(
245    caller: Principal,
246    controllers: &Controllers,
247    commit_batch: CommitBatch,
248    batch: &Batch,
249    rule: &Rule,
250    current: &Option<Asset>,
251    assertions: &impl StorageAssertionsStrategy,
252    storage_upload: &impl StorageUploadStrategy,
253) -> Result<Asset, String> {
254    let now = time();
255
256    if now > batch.expires_at {
257        clear_expired_batches();
258        return Err("Batch did not complete in time. Chunks cannot be committed.".to_string());
259    }
260
261    assert_commit_chunks(
262        caller,
263        controllers,
264        &commit_batch,
265        batch,
266        current,
267        rule,
268        assertions,
269    )?;
270
271    let CommitBatch {
272        chunk_ids,
273        batch_id,
274        headers,
275    } = commit_batch;
276
277    // Collect all chunks
278    let mut chunks: Vec<Chunk> = vec![];
279
280    for chunk_id in chunk_ids.iter() {
281        let chunk = get_runtime_chunk(chunk_id);
282
283        match chunk {
284            None => {
285                return Err("Chunk does not exist.".to_string());
286            }
287            Some(c) => {
288                if batch_id != c.batch_id {
289                    return Err("Chunk not included in the provided batch.".to_string());
290                }
291
292                chunks.push(c);
293            }
294        }
295    }
296
297    // Sort with ordering
298    chunks.sort_by(|a, b| a.order_id.cmp(&b.order_id));
299
300    let mut content_chunks: Vec<Blob> = vec![];
301
302    // Collect content
303    for c in chunks.iter() {
304        content_chunks.push(c.content.clone());
305    }
306
307    if content_chunks.is_empty() {
308        return Err("No chunk to commit.".to_string());
309    }
310
311    // We clone the key with the new information provided by the upload (name, full_path, token, etc.) to set the new key.
312    // However, the owner remains the one who originally created the asset.
313    let owner = current.as_ref().map_or(caller, |asset| asset.key.owner);
314
315    let key = AssetKey {
316        owner,
317        ..batch.clone().key
318    };
319
320    let mut asset: Asset = Asset::prepare(key, headers, current);
321
322    let encoding_type = get_encoding_type(&batch.encoding_type)?;
323
324    let encoding = AssetEncoding::from(&content_chunks);
325
326    match rule.max_size {
327        None => (),
328        Some(max_size) => {
329            if encoding.total_length > max_size {
330                clear_runtime_batch(&batch_id, &chunk_ids);
331                return Err("Asset exceed max allowed size.".to_string());
332            }
333        }
334    }
335
336    storage_upload.insert_asset_encoding(
337        &batch.clone().key.full_path,
338        &encoding_type,
339        &encoding,
340        &mut asset,
341        rule,
342    );
343
344    storage_upload.insert_asset(batch, &asset, rule)?;
345
346    clear_runtime_batch(&batch_id, &chunk_ids);
347
348    Ok(asset)
349}
350
351fn get_encoding_type(encoding_type: &Option<EncodingType>) -> Result<EncodingType, &'static str> {
352    let provided_type = encoding_type
353        .clone()
354        .unwrap_or_else(|| ASSET_ENCODING_NO_COMPRESSION.to_string());
355
356    let matching_type = Vec::from(ENCODING_CERTIFICATION_ORDER)
357        .iter()
358        .any(|&e| *e == provided_type);
359
360    if !matching_type {
361        return Err("Asset encoding not supported for certification purpose.");
362    }
363
364    Ok(provided_type)
365}
366
367fn clear_expired_batches() {
368    // Remove expired batches
369    clear_expired_runtime_batches();
370
371    // Remove chunk without existing batches (those we just deleted above)
372    clear_expired_runtime_chunks();
373}