Skip to main content

runmat_runtime/builtins/io/data/
mod.rs

1//! Cloud-ready dataset persistence builtins (`data.*`).
2
3use std::collections::BTreeMap;
4use std::collections::HashMap;
5use std::path::PathBuf;
6
7use runmat_builtins::{ObjectInstance, StructValue, Tensor, Value};
8use runmat_filesystem::data_contract::{DataChunkDescriptor, DataChunkUploadRequest};
9use runmat_macros::runtime_builtin;
10
11use crate::builtins::common::spec::{
12    BroadcastSemantics, BuiltinFusionSpec, BuiltinGpuSpec, ConstantStrategy, GpuOpKind,
13    ReductionNaN, ResidencyPolicy, ShapeRequirements,
14};
15use crate::data::{
16    array_object, data_error, dataset_object, dataset_root, ensure_manifest_sequence,
17    get_object_prop, manifest_path, manifest_version_token, now_rfc3339, parse_schema,
18    parse_string, read_array_payload_async, read_manifest_async, remove_tx, sha256_hex, start_tx,
19    transaction_object, with_tx, with_tx_mut, write_array_payload_async, write_manifest_async,
20    DataArrayMeta, DataArrayPayload, DataChunkIndex, DataChunkIndexEntry, DataManifest,
21    PendingCreateArray, PendingFill, PendingResize, PendingWrite, TxnStatus,
22};
23use crate::{make_cell, BuiltinResult};
24
25#[runmat_macros::register_gpu_spec(builtin_path = "crate::builtins::io::data")]
26pub const GPU_SPEC: BuiltinGpuSpec = BuiltinGpuSpec {
27    name: "data.*",
28    op_kind: GpuOpKind::Custom("io-data"),
29    supported_precisions: &[],
30    broadcast: BroadcastSemantics::None,
31    provider_hooks: &[],
32    constant_strategy: ConstantStrategy::InlineLiteral,
33    residency: ResidencyPolicy::GatherImmediately,
34    nan_mode: ReductionNaN::Include,
35    two_pass_threshold: None,
36    workgroup_size: None,
37    accepts_nan_mode: false,
38    notes: "Dataset operations are host I/O and metadata orchestration.",
39};
40
41#[runmat_macros::register_fusion_spec(builtin_path = "crate::builtins::io::data")]
42pub const FUSION_SPEC: BuiltinFusionSpec = BuiltinFusionSpec {
43    name: "data.*",
44    shape: ShapeRequirements::Any,
45    constant_strategy: ConstantStrategy::InlineLiteral,
46    elementwise: None,
47    reduction: None,
48    emits_nan: false,
49    notes: "Data builtins are side-effecting and not fusible.",
50};
51
52#[runtime_builtin(
53    name = "data.create",
54    category = "io/data",
55    summary = "Create a typed dataset at a .data path.",
56    keywords = "data,dataset,create,persistence",
57    sink = true,
58    type_resolver(crate::builtins::io::type_resolvers::data_dataset_type),
59    builtin_path = "crate::builtins::io::data"
60)]
61async fn data_create_builtin(
62    path: Value,
63    schema: Value,
64    _rest: Vec<Value>,
65) -> BuiltinResult<Value> {
66    let path = parse_string(&path, "data.create path")?;
67    let root = dataset_root(&path);
68    let schema = parse_schema(&schema)?;
69    let now = now_rfc3339();
70    let mut arrays = BTreeMap::new();
71    for (name, mut meta) in schema.arrays {
72        let total = meta.shape.iter().copied().product::<usize>();
73        let payload = DataArrayPayload {
74            dtype: meta.dtype.clone(),
75            shape: meta.shape.clone(),
76            values: vec![0.0; total],
77        };
78        let (payload_path, chunk_index_path) =
79            write_array_payload_async(&root, &name, &payload, &meta.chunk_shape).await?;
80        meta.data_path = make_rel_data_path(&root, &payload_path)?;
81        meta.chunk_index_path = Some(make_rel_data_path(&root, &chunk_index_path)?);
82        arrays.insert(name, meta);
83    }
84
85    let manifest = DataManifest {
86        schema_version: 1,
87        format: "runmat-data".to_string(),
88        dataset_id: crate::data::new_dataset_id(),
89        name: root.file_name().map(|v| v.to_string_lossy().to_string()),
90        created_at: now.clone(),
91        updated_at: now,
92        arrays,
93        attrs: BTreeMap::new(),
94        txn_sequence: 0,
95    };
96    write_manifest_async(&root, &manifest).await?;
97    Ok(dataset_object(&path, &manifest))
98}
99
100#[runtime_builtin(
101    name = "data.open",
102    category = "io/data",
103    summary = "Open a dataset handle from a .data path.",
104    keywords = "data,dataset,open,persistence",
105    type_resolver(crate::builtins::io::type_resolvers::data_dataset_type),
106    builtin_path = "crate::builtins::io::data"
107)]
108async fn data_open_builtin(path: Value, _rest: Vec<Value>) -> BuiltinResult<Value> {
109    let path = parse_string(&path, "data.open path")?;
110    let root = dataset_root(&path);
111    let manifest = read_manifest_async(&root).await?;
112    let mut ds = dataset_object(&path, &manifest);
113    hydrate_dataset_descriptor_async(&path, &mut ds).await;
114    Ok(ds)
115}
116
117#[runtime_builtin(
118    name = "data.exists",
119    category = "io/data",
120    summary = "Check if dataset exists.",
121    keywords = "data,dataset,exists",
122    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
123    builtin_path = "crate::builtins::io::data"
124)]
125async fn data_exists_builtin(path: Value) -> BuiltinResult<Value> {
126    let path = parse_string(&path, "data.exists path")?;
127    let root = dataset_root(&path);
128    let exists = runmat_filesystem::metadata_async(manifest_path(&root))
129        .await
130        .is_ok();
131    Ok(Value::Bool(exists))
132}
133
134#[runtime_builtin(
135    name = "data.delete",
136    category = "io/data",
137    summary = "Delete a dataset path.",
138    keywords = "data,dataset,delete",
139    sink = true,
140    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
141    builtin_path = "crate::builtins::io::data"
142)]
143async fn data_delete_builtin(path: Value, _rest: Vec<Value>) -> BuiltinResult<Value> {
144    let path = parse_string(&path, "data.delete path")?;
145    let root = dataset_root(&path);
146    runmat_filesystem::remove_dir_all_async(&root)
147        .await
148        .map_err(|err| {
149            data_error(format!(
150                "data.delete: failed to remove '{}': {err}",
151                root.display()
152            ))
153        })?;
154    Ok(Value::Bool(true))
155}
156
157#[runtime_builtin(
158    name = "data.copy",
159    category = "io/data",
160    summary = "Copy dataset to new path.",
161    keywords = "data,dataset,copy",
162    sink = true,
163    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
164    builtin_path = "crate::builtins::io::data"
165)]
166async fn data_copy_builtin(
167    from_path: Value,
168    to_path: Value,
169    _rest: Vec<Value>,
170) -> BuiltinResult<Value> {
171    let from = parse_string(&from_path, "data.copy fromPath")?;
172    let to = parse_string(&to_path, "data.copy toPath")?;
173    copy_dir_recursive(&dataset_root(&from), &dataset_root(&to)).await?;
174    Ok(Value::Bool(true))
175}
176
177#[runtime_builtin(
178    name = "data.move",
179    category = "io/data",
180    summary = "Move dataset to new path.",
181    keywords = "data,dataset,move",
182    sink = true,
183    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
184    builtin_path = "crate::builtins::io::data"
185)]
186async fn data_move_builtin(
187    from_path: Value,
188    to_path: Value,
189    _rest: Vec<Value>,
190) -> BuiltinResult<Value> {
191    let from = parse_string(&from_path, "data.move fromPath")?;
192    let to = parse_string(&to_path, "data.move toPath")?;
193    runmat_filesystem::rename_async(dataset_root(&from), dataset_root(&to))
194        .await
195        .map_err(|err| {
196            data_error(format!(
197                "data.move: failed to move dataset '{from}' -> '{to}': {err}"
198            ))
199        })?;
200    Ok(Value::Bool(true))
201}
202
203#[runtime_builtin(
204    name = "data.import",
205    category = "io/data",
206    summary = "Import an existing dataset file path.",
207    keywords = "data,dataset,import",
208    sink = true,
209    type_resolver(crate::builtins::io::type_resolvers::data_dataset_type),
210    builtin_path = "crate::builtins::io::data"
211)]
212async fn data_import_builtin(
213    path: Value,
214    format: Value,
215    source_path: Value,
216    _rest: Vec<Value>,
217) -> BuiltinResult<Value> {
218    let path = parse_string(&path, "data.import path")?;
219    let format = parse_string(&format, "data.import format")?;
220    if !format.eq_ignore_ascii_case("data") {
221        return Err(data_error(
222            "data.import currently supports only format='data'",
223        ));
224    }
225    let source_path = parse_string(&source_path, "data.import sourcePath")?;
226    copy_dir_recursive(&dataset_root(&source_path), &dataset_root(&path)).await?;
227    let manifest = read_manifest_async(&dataset_root(&path)).await?;
228    Ok(dataset_object(&path, &manifest))
229}
230
231#[runtime_builtin(
232    name = "data.export",
233    category = "io/data",
234    summary = "Export dataset to target path.",
235    keywords = "data,dataset,export",
236    sink = true,
237    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
238    builtin_path = "crate::builtins::io::data"
239)]
240async fn data_export_builtin(
241    path: Value,
242    format: Value,
243    target_path: Value,
244    _rest: Vec<Value>,
245) -> BuiltinResult<Value> {
246    let path = parse_string(&path, "data.export path")?;
247    let format = parse_string(&format, "data.export format")?;
248    if !format.eq_ignore_ascii_case("data") {
249        return Err(data_error(
250            "data.export currently supports only format='data'",
251        ));
252    }
253    let target_path = parse_string(&target_path, "data.export targetPath")?;
254    copy_dir_recursive(&dataset_root(&path), &dataset_root(&target_path)).await?;
255    Ok(Value::Bool(true))
256}
257
258#[runtime_builtin(
259    name = "data.list",
260    category = "io/data",
261    summary = "List dataset paths under a prefix.",
262    keywords = "data,dataset,list",
263    type_resolver(crate::builtins::io::type_resolvers::data_cell_string_type),
264    builtin_path = "crate::builtins::io::data"
265)]
266async fn data_list_builtin(path_prefix: Value, _rest: Vec<Value>) -> BuiltinResult<Value> {
267    let prefix = parse_string(&path_prefix, "data.list prefix")?;
268    let root = PathBuf::from(prefix);
269    let entries = runmat_filesystem::read_dir_async(&root)
270        .await
271        .map_err(|err| {
272            data_error(format!(
273                "data.list: failed to read '{}': {err}",
274                root.display()
275            ))
276        })?;
277    let mut values = Vec::new();
278    for entry in entries {
279        if !entry.is_dir() {
280            continue;
281        }
282        let candidate = entry.path();
283        if candidate.extension().and_then(|s| s.to_str()) != Some("data") {
284            continue;
285        }
286        if runmat_filesystem::metadata_async(candidate.join("manifest.json"))
287            .await
288            .is_ok()
289        {
290            values.push(Value::String(candidate.to_string_lossy().to_string()));
291        }
292    }
293    let cols = values.len();
294    make_cell(values, 1, cols).map_err(data_error)
295}
296
297#[runtime_builtin(
298    name = "data.inspect",
299    category = "io/data",
300    summary = "Inspect dataset metadata and schema fields.",
301    keywords = "data,dataset,inspect,schema",
302    type_resolver(crate::builtins::io::type_resolvers::data_struct_type),
303    builtin_path = "crate::builtins::io::data"
304)]
305async fn data_inspect_builtin(path: Value) -> BuiltinResult<Value> {
306    let path = parse_string(&path, "data.inspect path")?;
307    let root = dataset_root(&path);
308    let manifest = read_manifest_async(&root).await?;
309    let mut out = StructValue::new();
310    out.fields.insert("path".to_string(), Value::String(path));
311    out.fields
312        .insert("id".to_string(), Value::String(manifest.dataset_id));
313    out.fields.insert(
314        "arrayCount".to_string(),
315        Value::Num(manifest.arrays.len() as f64),
316    );
317    out.fields
318        .insert("updatedAt".to_string(), Value::String(manifest.updated_at));
319    Ok(Value::Struct(out))
320}
321
322#[runtime_builtin(
323    name = "Dataset.path",
324    category = "io/data",
325    summary = "Return dataset path.",
326    keywords = "dataset,path",
327    type_resolver(crate::builtins::io::type_resolvers::data_string_type),
328    builtin_path = "crate::builtins::io::data"
329)]
330async fn dataset_path_builtin(base: Value) -> BuiltinResult<Value> {
331    let obj = as_object(&base, "Dataset.path")?;
332    Ok(get_object_prop(obj, "__data_path")?.clone())
333}
334
335#[runtime_builtin(
336    name = "Dataset.id",
337    category = "io/data",
338    type_resolver(crate::builtins::io::type_resolvers::data_string_type),
339    builtin_path = "crate::builtins::io::data"
340)]
341async fn dataset_id_builtin(base: Value) -> BuiltinResult<Value> {
342    let obj = as_object(&base, "Dataset.id")?;
343    Ok(get_object_prop(obj, "__data_id")?.clone())
344}
345
346#[runtime_builtin(
347    name = "Dataset.version",
348    category = "io/data",
349    type_resolver(crate::builtins::io::type_resolvers::data_string_type),
350    builtin_path = "crate::builtins::io::data"
351)]
352async fn dataset_version_builtin(base: Value) -> BuiltinResult<Value> {
353    let obj = as_object(&base, "Dataset.version")?;
354    Ok(get_object_prop(obj, "__data_version")?.clone())
355}
356
357#[runtime_builtin(
358    name = "Dataset.arrays",
359    category = "io/data",
360    type_resolver(crate::builtins::io::type_resolvers::data_cell_string_type),
361    builtin_path = "crate::builtins::io::data"
362)]
363async fn dataset_arrays_builtin(base: Value) -> BuiltinResult<Value> {
364    let path = dataset_path_from_object(&base, "Dataset.arrays")?;
365    let manifest = read_manifest_async(&dataset_root(&path)).await?;
366    let values: Vec<Value> = manifest
367        .arrays
368        .keys()
369        .map(|k| Value::String(k.clone()))
370        .collect();
371    make_cell(values.clone(), 1, values.len()).map_err(data_error)
372}
373
374#[runtime_builtin(
375    name = "Dataset.has_array",
376    category = "io/data",
377    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
378    builtin_path = "crate::builtins::io::data"
379)]
380async fn dataset_has_array_builtin(base: Value, name: Value) -> BuiltinResult<Value> {
381    let path = dataset_path_from_object(&base, "Dataset.has_array")?;
382    let name = parse_string(&name, "Dataset.has_array name")?;
383    let manifest = read_manifest_async(&dataset_root(&path)).await?;
384    Ok(Value::Bool(manifest.arrays.contains_key(&name)))
385}
386
387#[runtime_builtin(
388    name = "Dataset.array",
389    category = "io/data",
390    type_resolver(crate::builtins::io::type_resolvers::data_array_type),
391    builtin_path = "crate::builtins::io::data"
392)]
393async fn dataset_array_builtin(base: Value, name: Value) -> BuiltinResult<Value> {
394    let path = dataset_path_from_object(&base, "Dataset.array")?;
395    let name = parse_string(&name, "Dataset.array name")?;
396    let manifest = read_manifest_async(&dataset_root(&path)).await?;
397    if !manifest.arrays.contains_key(&name) {
398        return Err(data_error(format!(
399            "Dataset.array: array '{name}' not found"
400        )));
401    }
402    Ok(array_object(&path, &name))
403}
404
405#[runtime_builtin(
406    name = "Dataset.attrs",
407    category = "io/data",
408    type_resolver(crate::builtins::io::type_resolvers::data_struct_type),
409    builtin_path = "crate::builtins::io::data"
410)]
411async fn dataset_attrs_builtin(base: Value) -> BuiltinResult<Value> {
412    let path = dataset_path_from_object(&base, "Dataset.attrs")?;
413    let manifest = read_manifest_async(&dataset_root(&path)).await?;
414    Ok(attrs_to_struct(&manifest.attrs))
415}
416
417#[runtime_builtin(
418    name = "Dataset.get_attr",
419    category = "io/data",
420    type_resolver(crate::builtins::io::type_resolvers::data_unknown_type),
421    builtin_path = "crate::builtins::io::data"
422)]
423async fn dataset_get_attr_builtin(
424    base: Value,
425    key: Value,
426    rest: Vec<Value>,
427) -> BuiltinResult<Value> {
428    let path = dataset_path_from_object(&base, "Dataset.get_attr")?;
429    let key = parse_string(&key, "Dataset.get_attr key")?;
430    let manifest = read_manifest_async(&dataset_root(&path)).await?;
431    if let Some(value) = manifest.attrs.get(&key) {
432        return Ok(json_to_value(value));
433    }
434    Ok(rest.first().cloned().unwrap_or(Value::Num(0.0)))
435}
436
437#[runtime_builtin(
438    name = "Dataset.set_attr",
439    category = "io/data",
440    sink = true,
441    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
442    builtin_path = "crate::builtins::io::data"
443)]
444async fn dataset_set_attr_builtin(base: Value, key: Value, value: Value) -> BuiltinResult<Value> {
445    let path = dataset_path_from_object(&base, "Dataset.set_attr")?;
446    let key = parse_string(&key, "Dataset.set_attr key")?;
447    let root = dataset_root(&path);
448    let mut manifest = read_manifest_async(&root).await?;
449    manifest.attrs.insert(key, value_to_json(&value));
450    manifest.updated_at = now_rfc3339();
451    manifest.txn_sequence = manifest.txn_sequence.saturating_add(1);
452    write_manifest_async(&root, &manifest).await?;
453    Ok(Value::Bool(true))
454}
455
456#[runtime_builtin(
457    name = "Dataset.set_attrs",
458    category = "io/data",
459    sink = true,
460    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
461    builtin_path = "crate::builtins::io::data"
462)]
463async fn dataset_set_attrs_builtin(base: Value, attrs: Value) -> BuiltinResult<Value> {
464    let path = dataset_path_from_object(&base, "Dataset.set_attrs")?;
465    let Value::Struct(incoming) = attrs else {
466        return Err(data_error("Dataset.set_attrs: attrs must be a struct"));
467    };
468    let root = dataset_root(&path);
469    let mut manifest = read_manifest_async(&root).await?;
470    for (k, v) in incoming.fields {
471        manifest.attrs.insert(k, value_to_json(&v));
472    }
473    manifest.updated_at = now_rfc3339();
474    manifest.txn_sequence = manifest.txn_sequence.saturating_add(1);
475    write_manifest_async(&root, &manifest).await?;
476    Ok(Value::Bool(true))
477}
478
479#[runtime_builtin(
480    name = "Dataset.begin",
481    category = "io/data",
482    type_resolver(crate::builtins::io::type_resolvers::data_tx_type),
483    builtin_path = "crate::builtins::io::data"
484)]
485async fn dataset_begin_builtin(base: Value, _rest: Vec<Value>) -> BuiltinResult<Value> {
486    let path = dataset_path_from_object(&base, "Dataset.begin")?;
487    let manifest = read_manifest_async(&dataset_root(&path)).await?;
488    let tx_id = start_tx(path.clone(), manifest.txn_sequence);
489    tracing::info!(
490        target: "runmat.data",
491        dataset = path,
492        tx_id = tx_id,
493        base_sequence = manifest.txn_sequence,
494        "data transaction begin"
495    );
496    Ok(transaction_object(&path, &tx_id))
497}
498
499#[runtime_builtin(
500    name = "Dataset.snapshot",
501    category = "io/data",
502    type_resolver(crate::builtins::io::type_resolvers::data_string_type),
503    builtin_path = "crate::builtins::io::data"
504)]
505async fn dataset_snapshot_builtin(
506    base: Value,
507    label: Value,
508    _rest: Vec<Value>,
509) -> BuiltinResult<Value> {
510    let path = dataset_path_from_object(&base, "Dataset.snapshot")?;
511    let label = parse_string(&label, "Dataset.snapshot label")?;
512    let root = dataset_root(&path);
513    let snapshots = root.join(".snapshots");
514    runmat_filesystem::create_dir_all_async(&snapshots)
515        .await
516        .map_err(|err| {
517            data_error(format!(
518                "Dataset.snapshot: failed to create snapshots dir: {err}"
519            ))
520        })?;
521    let src = manifest_path(&root);
522    let dst = snapshots.join(format!("{}.manifest.json", sanitize_label(&label)));
523    copy_file(&src, &dst).await?;
524    Ok(Value::String(dst.to_string_lossy().to_string()))
525}
526
527#[runtime_builtin(
528    name = "Dataset.refresh",
529    category = "io/data",
530    type_resolver(crate::builtins::io::type_resolvers::data_dataset_type),
531    builtin_path = "crate::builtins::io::data"
532)]
533async fn dataset_refresh_builtin(base: Value) -> BuiltinResult<Value> {
534    let path = dataset_path_from_object(&base, "Dataset.refresh")?;
535    let manifest = read_manifest_async(&dataset_root(&path)).await?;
536    let mut ds = dataset_object(&path, &manifest);
537    hydrate_dataset_descriptor_async(&path, &mut ds).await;
538    Ok(ds)
539}
540
541#[runtime_builtin(
542    name = "DataArray.name",
543    category = "io/data",
544    type_resolver(crate::builtins::io::type_resolvers::data_string_type),
545    builtin_path = "crate::builtins::io::data"
546)]
547async fn data_array_name_builtin(base: Value) -> BuiltinResult<Value> {
548    let obj = as_object(&base, "DataArray.name")?;
549    Ok(get_object_prop(obj, "__array_name")?.clone())
550}
551
552#[runtime_builtin(
553    name = "DataArray.dtype",
554    category = "io/data",
555    type_resolver(crate::builtins::io::type_resolvers::data_string_type),
556    builtin_path = "crate::builtins::io::data"
557)]
558async fn data_array_dtype_builtin(base: Value) -> BuiltinResult<Value> {
559    let (path, name) = array_identity(&base, "DataArray.dtype")?;
560    let manifest = read_manifest_async(&dataset_root(&path)).await?;
561    let meta = manifest
562        .arrays
563        .get(&name)
564        .ok_or_else(|| data_error(format!("DataArray.dtype: array '{name}' not found")))?;
565    Ok(Value::String(meta.dtype.clone()))
566}
567
568#[runtime_builtin(
569    name = "DataArray.shape",
570    category = "io/data",
571    type_resolver(crate::builtins::io::type_resolvers::data_shape_tensor_type),
572    builtin_path = "crate::builtins::io::data"
573)]
574async fn data_array_shape_builtin(base: Value) -> BuiltinResult<Value> {
575    let (path, name) = array_identity(&base, "DataArray.shape")?;
576    let manifest = read_manifest_async(&dataset_root(&path)).await?;
577    let meta = manifest
578        .arrays
579        .get(&name)
580        .ok_or_else(|| data_error(format!("DataArray.shape: array '{name}' not found")))?;
581    let values = meta.shape.iter().map(|v| *v as f64).collect::<Vec<_>>();
582    let tensor = Tensor::new(values, vec![1, meta.shape.len()])
583        .map_err(|err| data_error(format!("DataArray.shape: {err}")))?;
584    Ok(Value::Tensor(tensor))
585}
586
587#[runtime_builtin(
588    name = "DataArray.rank",
589    category = "io/data",
590    type_resolver(crate::builtins::io::type_resolvers::data_int_type),
591    builtin_path = "crate::builtins::io::data"
592)]
593async fn data_array_rank_builtin(base: Value) -> BuiltinResult<Value> {
594    let (path, name) = array_identity(&base, "DataArray.rank")?;
595    let manifest = read_manifest_async(&dataset_root(&path)).await?;
596    let meta = manifest
597        .arrays
598        .get(&name)
599        .ok_or_else(|| data_error(format!("DataArray.rank: array '{name}' not found")))?;
600    Ok(Value::Num(meta.shape.len() as f64))
601}
602
603#[runtime_builtin(
604    name = "DataArray.chunk_shape",
605    category = "io/data",
606    type_resolver(crate::builtins::io::type_resolvers::data_shape_tensor_type),
607    builtin_path = "crate::builtins::io::data"
608)]
609async fn data_array_chunk_shape_builtin(base: Value) -> BuiltinResult<Value> {
610    let (path, name) = array_identity(&base, "DataArray.chunk_shape")?;
611    let manifest = read_manifest_async(&dataset_root(&path)).await?;
612    let meta = manifest
613        .arrays
614        .get(&name)
615        .ok_or_else(|| data_error(format!("DataArray.chunk_shape: array '{name}' not found")))?;
616    let values = meta
617        .chunk_shape
618        .iter()
619        .map(|v| *v as f64)
620        .collect::<Vec<_>>();
621    let tensor = Tensor::new(values, vec![1, meta.chunk_shape.len()])
622        .map_err(|err| data_error(format!("DataArray.chunk_shape: {err}")))?;
623    Ok(Value::Tensor(tensor))
624}
625
626#[runtime_builtin(
627    name = "DataArray.codec",
628    category = "io/data",
629    type_resolver(crate::builtins::io::type_resolvers::data_string_type),
630    builtin_path = "crate::builtins::io::data"
631)]
632async fn data_array_codec_builtin(base: Value) -> BuiltinResult<Value> {
633    let (path, name) = array_identity(&base, "DataArray.codec")?;
634    let manifest = read_manifest_async(&dataset_root(&path)).await?;
635    let meta = manifest
636        .arrays
637        .get(&name)
638        .ok_or_else(|| data_error(format!("DataArray.codec: array '{name}' not found")))?;
639    Ok(Value::String(meta.codec.clone()))
640}
641
642#[runtime_builtin(
643    name = "DataArray.read",
644    category = "io/data",
645    type_resolver(crate::builtins::io::type_resolvers::data_tensor_type),
646    builtin_path = "crate::builtins::io::data"
647)]
648async fn data_array_read_builtin(base: Value, rest: Vec<Value>) -> BuiltinResult<Value> {
649    let (path, name) = array_identity(&base, "DataArray.read")?;
650    let root = dataset_root(&path);
651    let manifest = read_manifest_async(&root).await?;
652    let meta = manifest
653        .arrays
654        .get(&name)
655        .ok_or_else(|| data_error(format!("DataArray.read: array '{name}' not found")))?;
656    let payload = read_array_payload_async(&root, meta).await?;
657    let sliced = if let Some(slice_spec) = rest.first() {
658        read_slice_payload(&payload, slice_spec)?
659    } else {
660        payload
661    };
662    let tensor = Tensor::new(sliced.values, sliced.shape)
663        .map_err(|err| data_error(format!("DataArray.read: {err}")))?;
664    Ok(Value::Tensor(tensor))
665}
666
667#[runtime_builtin(
668    name = "DataArray.write",
669    category = "io/data",
670    sink = true,
671    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
672    builtin_path = "crate::builtins::io::data"
673)]
674async fn data_array_write_builtin(base: Value, rest: Vec<Value>) -> BuiltinResult<Value> {
675    let (path, name) = array_identity(&base, "DataArray.write")?;
676    let (slice_spec, value) = match rest.as_slice() {
677        [v] => (None, v),
678        [slice, v] => (Some(slice), v),
679        _ => {
680            return Err(data_error(
681                "DataArray.write expects values or (sliceSpec, values) arguments",
682            ))
683        }
684    };
685    write_array_full_async(&path, &name, slice_spec, value).await?;
686    Ok(Value::Bool(true))
687}
688
689#[runtime_builtin(
690    name = "DataArray.resize",
691    category = "io/data",
692    sink = true,
693    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
694    builtin_path = "crate::builtins::io::data"
695)]
696async fn data_array_resize_builtin(
697    base: Value,
698    new_shape: Value,
699    _rest: Vec<Value>,
700) -> BuiltinResult<Value> {
701    let (path, name) = array_identity(&base, "DataArray.resize")?;
702    let shape = parse_shape_from_value(&new_shape)?;
703    let root = dataset_root(&path);
704    let mut manifest = read_manifest_async(&root).await?;
705    let meta = manifest
706        .arrays
707        .get_mut(&name)
708        .ok_or_else(|| data_error(format!("DataArray.resize: array '{name}' not found")))?;
709    meta.shape = shape.clone();
710    let payload = DataArrayPayload {
711        dtype: meta.dtype.clone(),
712        shape: shape.clone(),
713        values: vec![0.0; shape.iter().copied().product()],
714    };
715    let (payload_path, chunk_index_path) =
716        write_array_payload_async(&root, &name, &payload, &meta.chunk_shape).await?;
717    meta.data_path = make_rel_data_path(&root, &payload_path)?;
718    meta.chunk_index_path = Some(make_rel_data_path(&root, &chunk_index_path)?);
719    manifest.updated_at = now_rfc3339();
720    manifest.txn_sequence = manifest.txn_sequence.saturating_add(1);
721    write_manifest_async(&root, &manifest).await?;
722    Ok(Value::Bool(true))
723}
724
725#[runtime_builtin(
726    name = "DataArray.fill",
727    category = "io/data",
728    sink = true,
729    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
730    builtin_path = "crate::builtins::io::data"
731)]
732async fn data_array_fill_builtin(
733    base: Value,
734    value: Value,
735    _rest: Vec<Value>,
736) -> BuiltinResult<Value> {
737    let (path, name) = array_identity(&base, "DataArray.fill")?;
738    let root = dataset_root(&path);
739    let mut manifest = read_manifest_async(&root).await?;
740    let meta = manifest
741        .arrays
742        .get_mut(&name)
743        .ok_or_else(|| data_error(format!("DataArray.fill: array '{name}' not found")))?;
744    let scalar = scalar_to_f64(&value)?;
745    let payload = DataArrayPayload {
746        dtype: meta.dtype.clone(),
747        shape: meta.shape.clone(),
748        values: vec![scalar; meta.shape.iter().copied().product()],
749    };
750    let (payload_path, chunk_index_path) =
751        write_array_payload_async(&root, &name, &payload, &meta.chunk_shape).await?;
752    meta.data_path = make_rel_data_path(&root, &payload_path)?;
753    meta.chunk_index_path = Some(make_rel_data_path(&root, &chunk_index_path)?);
754    manifest.updated_at = now_rfc3339();
755    manifest.txn_sequence = manifest.txn_sequence.saturating_add(1);
756    write_manifest_async(&root, &manifest).await?;
757    Ok(Value::Bool(true))
758}
759
760#[runtime_builtin(
761    name = "DataTransaction.id",
762    category = "io/data",
763    type_resolver(crate::builtins::io::type_resolvers::data_string_type),
764    builtin_path = "crate::builtins::io::data"
765)]
766async fn data_tx_id_builtin(base: Value) -> BuiltinResult<Value> {
767    let obj = as_object(&base, "DataTransaction.id")?;
768    Ok(get_object_prop(obj, "__tx_id")?.clone())
769}
770
771#[runtime_builtin(
772    name = "DataTransaction.write",
773    category = "io/data",
774    sink = true,
775    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
776    builtin_path = "crate::builtins::io::data"
777)]
778async fn data_tx_write_builtin(
779    base: Value,
780    array_name: Value,
781    slice: Value,
782    values: Value,
783    _rest: Vec<Value>,
784) -> BuiltinResult<Value> {
785    let tx_id = tx_id_from_object(&base, "DataTransaction.write")?;
786    let array_name = parse_string(&array_name, "DataTransaction.write arrayName")?;
787    with_tx_mut(&tx_id, |tx| {
788        if tx.status != TxnStatus::Open {
789            return Err(data_error("DataTransaction.write: transaction is not open"));
790        }
791        tx.writes.push(PendingWrite {
792            array: array_name,
793            slice_spec: Some(slice),
794            value: values,
795        });
796        Ok(())
797    })?;
798    Ok(Value::Bool(true))
799}
800
801#[runtime_builtin(
802    name = "DataTransaction.set_attr",
803    category = "io/data",
804    sink = true,
805    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
806    builtin_path = "crate::builtins::io::data"
807)]
808async fn data_tx_set_attr_builtin(base: Value, key: Value, value: Value) -> BuiltinResult<Value> {
809    let tx_id = tx_id_from_object(&base, "DataTransaction.set_attr")?;
810    let key = parse_string(&key, "DataTransaction.set_attr key")?;
811    with_tx_mut(&tx_id, |tx| {
812        if tx.status != TxnStatus::Open {
813            return Err(data_error(
814                "DataTransaction.set_attr: transaction is not open",
815            ));
816        }
817        tx.attrs.insert(key, value);
818        Ok(())
819    })?;
820    Ok(Value::Bool(true))
821}
822
823#[runtime_builtin(
824    name = "DataTransaction.set_attrs",
825    category = "io/data",
826    sink = true,
827    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
828    builtin_path = "crate::builtins::io::data"
829)]
830async fn data_tx_set_attrs_builtin(base: Value, attrs: Value) -> BuiltinResult<Value> {
831    let tx_id = tx_id_from_object(&base, "DataTransaction.set_attrs")?;
832    let Value::Struct(incoming) = attrs else {
833        return Err(data_error(
834            "DataTransaction.set_attrs: attrs must be struct",
835        ));
836    };
837    with_tx_mut(&tx_id, |tx| {
838        if tx.status != TxnStatus::Open {
839            return Err(data_error(
840                "DataTransaction.set_attrs: transaction is not open",
841            ));
842        }
843        for (k, v) in incoming.fields {
844            tx.attrs.insert(k, v);
845        }
846        Ok(())
847    })?;
848    Ok(Value::Bool(true))
849}
850
851#[runtime_builtin(
852    name = "DataTransaction.resize",
853    category = "io/data",
854    sink = true,
855    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
856    builtin_path = "crate::builtins::io::data"
857)]
858async fn data_tx_resize_builtin(
859    base: Value,
860    array_name: Value,
861    new_shape: Value,
862    _rest: Vec<Value>,
863) -> BuiltinResult<Value> {
864    let tx_id = tx_id_from_object(&base, "DataTransaction.resize")?;
865    let array_name = parse_string(&array_name, "DataTransaction.resize arrayName")?;
866    let shape = parse_shape_from_value(&new_shape)?;
867    with_tx_mut(&tx_id, |tx| {
868        if tx.status != TxnStatus::Open {
869            return Err(data_error(
870                "DataTransaction.resize: transaction is not open",
871            ));
872        }
873        tx.resizes.push(PendingResize {
874            array: array_name,
875            shape,
876        });
877        Ok(())
878    })?;
879    Ok(Value::Bool(true))
880}
881
882#[runtime_builtin(
883    name = "DataTransaction.fill",
884    category = "io/data",
885    sink = true,
886    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
887    builtin_path = "crate::builtins::io::data"
888)]
889async fn data_tx_fill_builtin(
890    base: Value,
891    array_name: Value,
892    value: Value,
893    rest: Vec<Value>,
894) -> BuiltinResult<Value> {
895    let tx_id = tx_id_from_object(&base, "DataTransaction.fill")?;
896    let array_name = parse_string(&array_name, "DataTransaction.fill arrayName")?;
897    let slice_spec = rest.first().cloned();
898    with_tx_mut(&tx_id, |tx| {
899        if tx.status != TxnStatus::Open {
900            return Err(data_error("DataTransaction.fill: transaction is not open"));
901        }
902        tx.fills.push(PendingFill {
903            array: array_name,
904            slice_spec,
905            value,
906        });
907        Ok(())
908    })?;
909    Ok(Value::Bool(true))
910}
911
912#[runtime_builtin(
913    name = "DataTransaction.delete_array",
914    category = "io/data",
915    sink = true,
916    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
917    builtin_path = "crate::builtins::io::data"
918)]
919async fn data_tx_delete_array_builtin(base: Value, array_name: Value) -> BuiltinResult<Value> {
920    let tx_id = tx_id_from_object(&base, "DataTransaction.delete_array")?;
921    let array_name = parse_string(&array_name, "DataTransaction.delete_array arrayName")?;
922    with_tx_mut(&tx_id, |tx| {
923        if tx.status != TxnStatus::Open {
924            return Err(data_error(
925                "DataTransaction.delete_array: transaction is not open",
926            ));
927        }
928        tx.delete_arrays.push(array_name);
929        Ok(())
930    })?;
931    Ok(Value::Bool(true))
932}
933
934#[runtime_builtin(
935    name = "DataTransaction.create_array",
936    category = "io/data",
937    sink = true,
938    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
939    builtin_path = "crate::builtins::io::data"
940)]
941async fn data_tx_create_array_builtin(
942    base: Value,
943    array_name: Value,
944    meta: Value,
945) -> BuiltinResult<Value> {
946    let tx_id = tx_id_from_object(&base, "DataTransaction.create_array")?;
947    let array_name = parse_string(&array_name, "DataTransaction.create_array arrayName")?;
948    let meta = parse_array_meta(&array_name, &meta)?;
949    with_tx_mut(&tx_id, |tx| {
950        if tx.status != TxnStatus::Open {
951            return Err(data_error(
952                "DataTransaction.create_array: transaction is not open",
953            ));
954        }
955        tx.create_arrays.push(PendingCreateArray {
956            array: array_name,
957            meta,
958        });
959        Ok(())
960    })?;
961    Ok(Value::Bool(true))
962}
963
964#[runtime_builtin(
965    name = "DataTransaction.commit",
966    category = "io/data",
967    sink = true,
968    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
969    builtin_path = "crate::builtins::io::data"
970)]
971async fn data_tx_commit_builtin(base: Value, rest: Vec<Value>) -> BuiltinResult<Value> {
972    let tx_id = tx_id_from_object(&base, "DataTransaction.commit")?;
973    let (dataset_path, base_sequence, writes, resizes, fills, create_arrays, delete_arrays, attrs) =
974        with_tx(&tx_id, |tx| {
975            if tx.status != TxnStatus::Open {
976                return Err(data_error(
977                    "DataTransaction.commit: transaction is not open",
978                ));
979            }
980            Ok((
981                tx.dataset_path.clone(),
982                tx.base_sequence,
983                tx.writes.clone(),
984                tx.resizes.clone(),
985                tx.fills.clone(),
986                tx.create_arrays.clone(),
987                tx.delete_arrays.clone(),
988                tx.attrs.clone(),
989            ))
990        })?;
991
992    let write_ops = writes.len();
993    let resize_ops = resizes.len();
994    let fill_ops = fills.len();
995    let create_ops = create_arrays.len();
996    let delete_ops = delete_arrays.len();
997    let attr_updates = attrs.len();
998
999    let root = dataset_root(&dataset_path);
1000    let mut manifest = read_manifest_async(&root).await?;
1001    ensure_manifest_sequence(base_sequence, &manifest)?;
1002    if let Some(Value::Struct(options)) = rest.first() {
1003        if let Some(expected) = options.fields.get("if_manifest") {
1004            let expected = parse_string(expected, "DataTransaction.commit if_manifest")?;
1005            let actual = manifest_version_token(&manifest);
1006            if expected != actual {
1007                tracing::warn!(
1008                    target: "runmat.data",
1009                    tx_id = tx_id,
1010                    expected_manifest = expected,
1011                    actual_manifest = actual,
1012                    "data transaction manifest conflict"
1013                );
1014                return Err(data_error(
1015                    "MANIFEST_CONFLICT: if_manifest precondition failed",
1016                ));
1017            }
1018        }
1019    }
1020    for create in create_arrays {
1021        create_array_in_manifest(&root, &mut manifest, &create.array, create.meta).await?;
1022    }
1023    for resize in resizes {
1024        resize_array_in_manifest(&root, &mut manifest, &resize.array, resize.shape).await?;
1025    }
1026    for fill in fills {
1027        fill_array_in_manifest(
1028            &root,
1029            &mut manifest,
1030            &fill.array,
1031            fill.slice_spec.as_ref(),
1032            &fill.value,
1033        )
1034        .await?;
1035    }
1036    for write in writes {
1037        apply_write_to_manifest_async(
1038            &root,
1039            &mut manifest,
1040            &write.array,
1041            write.slice_spec.as_ref(),
1042            &write.value,
1043        )
1044        .await?;
1045    }
1046    for array_name in delete_arrays {
1047        delete_array_in_manifest_async(&root, &mut manifest, &array_name).await?;
1048    }
1049    for (k, v) in attrs {
1050        manifest.attrs.insert(k, value_to_json(&v));
1051    }
1052    manifest.updated_at = now_rfc3339();
1053    manifest.txn_sequence = manifest.txn_sequence.saturating_add(1);
1054    write_manifest_async(&root, &manifest).await?;
1055    with_tx_mut(&tx_id, |tx| {
1056        tx.status = TxnStatus::Committed;
1057        Ok(())
1058    })?;
1059    tracing::info!(
1060        target: "runmat.data",
1061        dataset = dataset_path,
1062        tx_id = tx_id,
1063        write_ops = write_ops,
1064        resize_ops = resize_ops,
1065        fill_ops = fill_ops,
1066        create_ops = create_ops,
1067        delete_ops = delete_ops,
1068        attr_updates = attr_updates,
1069        next_sequence = manifest.txn_sequence,
1070        "data transaction commit"
1071    );
1072    remove_tx(&tx_id);
1073    Ok(Value::Bool(true))
1074}
1075
1076#[runtime_builtin(
1077    name = "commit",
1078    category = "io/data",
1079    sink = true,
1080    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1081    builtin_path = "crate::builtins::io::data"
1082)]
1083async fn data_tx_commit_alias_builtin(base: Value, rest: Vec<Value>) -> BuiltinResult<Value> {
1084    match &base {
1085        Value::Object(obj) if obj.class_name == "DataTransaction" => {
1086            data_tx_commit_builtin(base, rest).await
1087        }
1088        Value::HandleObject(handle) if handle.class_name == "DataTransaction" => {
1089            data_tx_commit_builtin(base, rest).await
1090        }
1091        _ => Err(data_error(
1092            "commit: receiver must be a DataTransaction (use tx = ds.begin())",
1093        )),
1094    }
1095}
1096
1097#[runtime_builtin(
1098    name = "DataTransaction.abort",
1099    category = "io/data",
1100    sink = true,
1101    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1102    builtin_path = "crate::builtins::io::data"
1103)]
1104async fn data_tx_abort_builtin(base: Value) -> BuiltinResult<Value> {
1105    let tx_id = tx_id_from_object(&base, "DataTransaction.abort")?;
1106    with_tx_mut(&tx_id, |tx| {
1107        tx.status = TxnStatus::Aborted;
1108        Ok(())
1109    })?;
1110    tracing::info!(
1111        target: "runmat.data",
1112        tx_id = tx_id,
1113        "data transaction abort"
1114    );
1115    remove_tx(&tx_id);
1116    Ok(Value::Bool(true))
1117}
1118
1119#[runtime_builtin(
1120    name = "DataTransaction.status",
1121    category = "io/data",
1122    type_resolver(crate::builtins::io::type_resolvers::data_string_type),
1123    builtin_path = "crate::builtins::io::data"
1124)]
1125async fn data_tx_status_builtin(base: Value) -> BuiltinResult<Value> {
1126    let tx_id = tx_id_from_object(&base, "DataTransaction.status")?;
1127    with_tx(&tx_id, |tx| {
1128        let status = match tx.status {
1129            TxnStatus::Open => "open",
1130            TxnStatus::Committed => "committed",
1131            TxnStatus::Aborted => "aborted",
1132        };
1133        Ok(Value::String(status.to_string()))
1134    })
1135}
1136
1137fn dataset_path_from_object(base: &Value, context: &str) -> BuiltinResult<String> {
1138    let obj = as_object(base, context)?;
1139    parse_string(get_object_prop(obj, "__data_path")?, context)
1140}
1141
1142fn tx_id_from_object(base: &Value, context: &str) -> BuiltinResult<String> {
1143    let obj = as_object(base, context)?;
1144    parse_string(get_object_prop(obj, "__tx_id")?, context)
1145}
1146
1147fn array_identity(base: &Value, context: &str) -> BuiltinResult<(String, String)> {
1148    let obj = as_object(base, context)?;
1149    let path = parse_string(get_object_prop(obj, "__data_path")?, context)?;
1150    let name = parse_string(get_object_prop(obj, "__array_name")?, context)?;
1151    Ok((path, name))
1152}
1153
1154fn as_object<'a>(value: &'a Value, context: &str) -> BuiltinResult<&'a ObjectInstance> {
1155    match value {
1156        Value::Object(obj) => Ok(obj),
1157        _ => Err(data_error(format!("{context}: expected object receiver"))),
1158    }
1159}
1160
1161async fn hydrate_dataset_descriptor_async(path: &str, dataset: &mut Value) {
1162    let request = runmat_filesystem::data_contract::DataManifestRequest {
1163        path: path.to_string(),
1164        version: None,
1165    };
1166    let descriptor = match runmat_filesystem::data_manifest_descriptor_async(&request).await {
1167        Ok(descriptor) => descriptor,
1168        Err(_) => return,
1169    };
1170    let Value::Object(obj) = dataset else {
1171        return;
1172    };
1173    if !descriptor.dataset_id.is_empty() {
1174        obj.properties.insert(
1175            "__data_id".to_string(),
1176            Value::String(descriptor.dataset_id),
1177        );
1178    }
1179    obj.properties.insert(
1180        "__data_version".to_string(),
1181        Value::String(format!(
1182            "{}:{}",
1183            descriptor.updated_at, descriptor.txn_sequence
1184        )),
1185    );
1186}
1187
1188fn sanitize_label(label: &str) -> String {
1189    label
1190        .chars()
1191        .map(|ch| {
1192            if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
1193                ch
1194            } else {
1195                '_'
1196            }
1197        })
1198        .collect()
1199}
1200
1201async fn copy_file(src: &PathBuf, dst: &PathBuf) -> BuiltinResult<()> {
1202    let bytes = runmat_filesystem::read_async(src)
1203        .await
1204        .map_err(|err| data_error(format!("failed to open '{}': {err}", src.display())))?;
1205    let parent = dst.parent().ok_or_else(|| {
1206        data_error(format!(
1207            "invalid destination path '{}': missing parent",
1208            dst.display()
1209        ))
1210    })?;
1211    runmat_filesystem::create_dir_all_async(parent)
1212        .await
1213        .map_err(|err| data_error(format!("failed to create '{}': {err}", parent.display())))?;
1214    runmat_filesystem::write_async(dst, &bytes)
1215        .await
1216        .map_err(|err| {
1217            data_error(format!(
1218                "failed to copy '{}' -> '{}': {err}",
1219                src.display(),
1220                dst.display()
1221            ))
1222        })?;
1223    Ok(())
1224}
1225
1226fn make_rel_data_path(
1227    root: &std::path::Path,
1228    payload_path: &std::path::Path,
1229) -> BuiltinResult<String> {
1230    let rel = payload_path
1231        .strip_prefix(root)
1232        .map_err(|err| data_error(format!("failed to compute relative data path: {err}")))?;
1233    Ok(rel.to_string_lossy().to_string())
1234}
1235
1236async fn create_array_in_manifest(
1237    root: &std::path::Path,
1238    manifest: &mut DataManifest,
1239    array_name: &str,
1240    mut meta: DataArrayMeta,
1241) -> BuiltinResult<()> {
1242    if manifest.arrays.contains_key(array_name) {
1243        return Err(data_error(format!(
1244            "DataTransaction.create_array: array '{array_name}' already exists"
1245        )));
1246    }
1247    let payload = DataArrayPayload {
1248        dtype: meta.dtype.clone(),
1249        shape: meta.shape.clone(),
1250        values: vec![0.0; meta.shape.iter().copied().product()],
1251    };
1252    let (payload_path, chunk_index_path) =
1253        write_array_payload_async(root, array_name, &payload, &meta.chunk_shape).await?;
1254    meta.data_path = make_rel_data_path(root, &payload_path)?;
1255    meta.chunk_index_path = Some(make_rel_data_path(root, &chunk_index_path)?);
1256    manifest.arrays.insert(array_name.to_string(), meta);
1257    Ok(())
1258}
1259
1260async fn resize_array_in_manifest(
1261    root: &std::path::Path,
1262    manifest: &mut DataManifest,
1263    array_name: &str,
1264    shape: Vec<usize>,
1265) -> BuiltinResult<()> {
1266    let meta = manifest
1267        .arrays
1268        .get_mut(array_name)
1269        .ok_or_else(|| data_error(format!("array '{array_name}' not found")))?;
1270    meta.shape = shape.clone();
1271    let payload = DataArrayPayload {
1272        dtype: meta.dtype.clone(),
1273        shape: shape.clone(),
1274        values: vec![0.0; shape.iter().copied().product()],
1275    };
1276    let (payload_path, chunk_index_path) =
1277        write_array_payload_async(root, array_name, &payload, &meta.chunk_shape).await?;
1278    meta.data_path = make_rel_data_path(root, &payload_path)?;
1279    meta.chunk_index_path = Some(make_rel_data_path(root, &chunk_index_path)?);
1280    Ok(())
1281}
1282
1283async fn fill_array_in_manifest(
1284    root: &std::path::Path,
1285    manifest: &mut DataManifest,
1286    array_name: &str,
1287    slice_spec: Option<&Value>,
1288    value: &Value,
1289) -> BuiltinResult<()> {
1290    let meta: DataArrayMeta = manifest
1291        .arrays
1292        .get(array_name)
1293        .cloned()
1294        .ok_or_else(|| data_error(format!("array '{array_name}' not found")))?;
1295    let scalar = scalar_to_f64(value)?;
1296    let payload = read_array_payload_async(root, &meta).await?;
1297    let next_payload = if let Some(slice_spec) = slice_spec {
1298        let ranges = parse_slice_spec(slice_spec, &payload.shape)?;
1299        let target_shape: Vec<usize> = ranges
1300            .iter()
1301            .map(|r| r.end.saturating_sub(r.start))
1302            .collect();
1303        let rhs = Value::Tensor(
1304            Tensor::new(
1305                vec![scalar; target_shape.iter().copied().product()],
1306                target_shape,
1307            )
1308            .map_err(|err| data_error(format!("DataTransaction.fill: {err}")))?,
1309        );
1310        write_slice_payload(&payload, slice_spec, &rhs)?
1311    } else {
1312        DataArrayPayload {
1313            dtype: payload.dtype,
1314            shape: payload.shape.clone(),
1315            values: vec![scalar; payload.shape.iter().copied().product()],
1316        }
1317    };
1318    let (payload_path, chunk_index_path) =
1319        write_array_payload_async(root, array_name, &next_payload, &meta.chunk_shape).await?;
1320    if let Some(updated) = manifest.arrays.get_mut(array_name) {
1321        updated.shape = next_payload.shape.clone();
1322        updated.data_path = make_rel_data_path(root, &payload_path)?;
1323        updated.chunk_index_path = Some(make_rel_data_path(root, &chunk_index_path)?);
1324    }
1325    Ok(())
1326}
1327
1328async fn delete_array_in_manifest_async(
1329    root: &std::path::Path,
1330    manifest: &mut DataManifest,
1331    array_name: &str,
1332) -> BuiltinResult<()> {
1333    let removed = manifest.arrays.remove(array_name);
1334    if removed.is_none() {
1335        return Err(data_error(format!(
1336            "DataTransaction.delete_array: array '{array_name}' not found"
1337        )));
1338    }
1339    let array_dir = root.join("arrays").join(array_name);
1340    if runmat_filesystem::metadata_async(&array_dir).await.is_ok() {
1341        runmat_filesystem::remove_dir_all_async(&array_dir)
1342            .await
1343            .map_err(|err| {
1344                data_error(format!(
1345                    "DataTransaction.delete_array: failed to remove '{}': {err}",
1346                    array_dir.display()
1347                ))
1348            })?;
1349    }
1350    Ok(())
1351}
1352
1353fn parse_array_meta(array_name: &str, meta: &Value) -> BuiltinResult<DataArrayMeta> {
1354    let Value::Struct(meta_struct) = meta else {
1355        return Err(data_error(
1356            "DataTransaction.create_array: meta must be a struct",
1357        ));
1358    };
1359    let dtype = meta_struct
1360        .fields
1361        .get("dtype")
1362        .map(|v| parse_string(v, "DataTransaction.create_array dtype"))
1363        .transpose()?
1364        .unwrap_or_else(|| "f64".to_string());
1365    let shape = meta_struct
1366        .fields
1367        .get("shape")
1368        .map(parse_shape_from_value)
1369        .transpose()?
1370        .unwrap_or_else(|| vec![0, 0]);
1371    let chunk_shape = meta_struct
1372        .fields
1373        .get("chunk")
1374        .map(parse_shape_from_value)
1375        .transpose()?
1376        .unwrap_or_else(|| default_chunk_shape(&shape));
1377    let codec = meta_struct
1378        .fields
1379        .get("codec")
1380        .map(|v| parse_string(v, "DataTransaction.create_array codec"))
1381        .transpose()?
1382        .unwrap_or_else(|| "zstd".to_string());
1383    Ok(DataArrayMeta {
1384        dtype,
1385        shape,
1386        chunk_shape,
1387        order: "column_major".to_string(),
1388        codec,
1389        chunk_index_path: Some(format!("arrays/{array_name}/chunks/index.json")),
1390        data_path: format!("arrays/{array_name}/data.f64.json"),
1391    })
1392}
1393
1394fn default_chunk_shape(shape: &[usize]) -> Vec<usize> {
1395    if shape.is_empty() {
1396        return vec![1024];
1397    }
1398    let mut out = shape.to_vec();
1399    if out.len() == 1 {
1400        out[0] = out[0].clamp(1, 65_536);
1401        return out;
1402    }
1403    out[0] = out[0].clamp(1, 256);
1404    out[1] = out[1].clamp(1, 256);
1405    for dim in out.iter_mut().skip(2) {
1406        *dim = (*dim).clamp(1, 8);
1407    }
1408    out
1409}
1410
1411#[async_recursion::async_recursion(?Send)]
1412async fn copy_dir_recursive(src: &PathBuf, dst: &PathBuf) -> BuiltinResult<()> {
1413    let metadata = runmat_filesystem::metadata_async(src)
1414        .await
1415        .map_err(|err| data_error(format!("failed to stat '{}': {err}", src.display())))?;
1416    if !metadata.is_dir() {
1417        return Err(data_error(format!(
1418            "expected dataset directory at '{}'",
1419            src.display()
1420        )));
1421    }
1422    runmat_filesystem::create_dir_all_async(dst)
1423        .await
1424        .map_err(|err| data_error(format!("failed to create '{}': {err}", dst.display())))?;
1425    for entry in runmat_filesystem::read_dir_async(src)
1426        .await
1427        .map_err(|err| data_error(format!("failed to read '{}': {err}", src.display())))?
1428    {
1429        let entry_src = entry.path().to_path_buf();
1430        let entry_dst = dst.join(entry.file_name());
1431        if entry.is_dir() {
1432            copy_dir_recursive(&entry_src, &entry_dst).await?;
1433            continue;
1434        }
1435        copy_file(&entry_src, &entry_dst).await?;
1436    }
1437    Ok(())
1438}
1439
1440fn parse_shape_from_value(value: &Value) -> BuiltinResult<Vec<usize>> {
1441    match value {
1442        Value::Tensor(t) => {
1443            let mut out = Vec::with_capacity(t.data.len());
1444            for v in &t.data {
1445                if !v.is_finite() || *v < 0.0 {
1446                    return Err(data_error(
1447                        "shape dimensions must be non-negative finite numbers",
1448                    ));
1449                }
1450                out.push(*v as usize);
1451            }
1452            Ok(out)
1453        }
1454        Value::Num(v) => {
1455            if !v.is_finite() || *v < 0.0 {
1456                return Err(data_error(
1457                    "shape dimensions must be non-negative finite numbers",
1458                ));
1459            }
1460            Ok(vec![*v as usize])
1461        }
1462        Value::Int(v) => {
1463            let n = v.to_i64();
1464            if n < 0 {
1465                return Err(data_error("shape dimensions must be non-negative"));
1466            }
1467            Ok(vec![n as usize])
1468        }
1469        _ => Err(data_error("shape must be a numeric vector")),
1470    }
1471}
1472
1473fn scalar_to_f64(value: &Value) -> BuiltinResult<f64> {
1474    match value {
1475        Value::Num(v) => Ok(*v),
1476        Value::Int(v) => Ok(v.to_i64() as f64),
1477        _ => Err(data_error("expected numeric scalar")),
1478    }
1479}
1480
1481async fn write_array_full_async(
1482    dataset_path: &str,
1483    array_name: &str,
1484    slice_spec: Option<&Value>,
1485    value: &Value,
1486) -> BuiltinResult<()> {
1487    let root = dataset_root(dataset_path);
1488    let mut manifest = read_manifest_async(&root).await?;
1489    apply_write_to_manifest_async(&root, &mut manifest, array_name, slice_spec, value).await?;
1490    manifest.updated_at = now_rfc3339();
1491    manifest.txn_sequence = manifest.txn_sequence.saturating_add(1);
1492    write_manifest_async(&root, &manifest).await
1493}
1494
1495async fn apply_write_to_manifest_async(
1496    root: &std::path::Path,
1497    manifest: &mut DataManifest,
1498    array_name: &str,
1499    slice_spec: Option<&Value>,
1500    value: &Value,
1501) -> BuiltinResult<()> {
1502    let meta: DataArrayMeta = manifest
1503        .arrays
1504        .get(array_name)
1505        .cloned()
1506        .ok_or_else(|| data_error(format!("array '{array_name}' not found")))?;
1507
1508    if let Some(slice_spec) = slice_spec {
1509        if apply_slice_write_chunked_async(root, manifest, array_name, &meta, slice_spec, value)
1510            .await?
1511        {
1512            return Ok(());
1513        }
1514    }
1515
1516    let payload = read_array_payload_async(root, &meta).await?;
1517    let mut next_payload = payload.clone();
1518    if let Some(slice_spec) = slice_spec {
1519        next_payload = write_slice_payload(&payload, slice_spec, value)?;
1520    } else {
1521        let (shape, values) = value_to_tensor_shape_values(value)?;
1522        next_payload.shape = shape;
1523        next_payload.values = values;
1524    }
1525
1526    let (payload_path, chunk_index_path) =
1527        write_array_payload_async(root, array_name, &next_payload, &meta.chunk_shape).await?;
1528    if let Some(updated) = manifest.arrays.get_mut(array_name) {
1529        updated.shape = next_payload.shape.clone();
1530        updated.data_path = make_rel_data_path(root, &payload_path)?;
1531        updated.chunk_index_path = Some(make_rel_data_path(root, &chunk_index_path)?);
1532    }
1533    Ok(())
1534}
1535
1536async fn apply_slice_write_chunked_async(
1537    root: &std::path::Path,
1538    manifest: &mut DataManifest,
1539    array_name: &str,
1540    meta: &DataArrayMeta,
1541    slice_spec: &Value,
1542    value: &Value,
1543) -> BuiltinResult<bool> {
1544    let Some(index_rel_path) = &meta.chunk_index_path else {
1545        return Ok(false);
1546    };
1547    let index_path = root.join(index_rel_path);
1548    if runmat_filesystem::metadata_async(&index_path)
1549        .await
1550        .is_err()
1551    {
1552        return Ok(false);
1553    }
1554    let ranges = parse_slice_spec(slice_spec, &meta.shape)?;
1555    let rhs_shape: Vec<usize> = ranges
1556        .iter()
1557        .map(|r| r.end.saturating_sub(r.start))
1558        .collect();
1559    let (actual_rhs_shape, rhs_values) = value_to_tensor_shape_values(value)?;
1560    if actual_rhs_shape != rhs_shape {
1561        return Err(data_error(format!(
1562            "SHAPE_MISMATCH: rhs shape {:?} must match target slice shape {:?}",
1563            actual_rhs_shape, rhs_shape
1564        )));
1565    }
1566
1567    let index_bytes = runmat_filesystem::read_async(&index_path)
1568        .await
1569        .map_err(|err| {
1570            data_error(format!(
1571                "failed to read chunk index '{}': {err}",
1572                index_path.display()
1573            ))
1574        })?;
1575    let mut chunk_index: DataChunkIndex = serde_json::from_slice(&index_bytes).map_err(|err| {
1576        data_error(format!(
1577            "failed to parse chunk index '{}': {err}",
1578            index_path.display()
1579        ))
1580    })?;
1581
1582    let mut pos_by_key = HashMap::new();
1583    for (idx, entry) in chunk_index.chunks.iter().enumerate() {
1584        pos_by_key.insert(entry.key.clone(), idx);
1585    }
1586    let touched = touched_chunk_coords(&ranges, &meta.chunk_shape, &meta.shape);
1587    let mut upload_batch = Vec::<(DataChunkDescriptor, Vec<u8>)>::new();
1588
1589    for coords in touched {
1590        let key = chunk_key(&coords);
1591        let chunk_start = chunk_start_for_coords(&coords, &meta.chunk_shape);
1592        let chunk_extent = chunk_extent_for_start(&chunk_start, &meta.chunk_shape, &meta.shape);
1593        let intersection = chunk_intersection(&ranges, &chunk_start, &chunk_extent);
1594        if intersection.is_empty() {
1595            continue;
1596        }
1597
1598        let (entry_index, existed, mut entry, mut chunk_payload) = load_or_init_chunk(
1599            root,
1600            array_name,
1601            &key,
1602            &coords,
1603            &chunk_extent,
1604            &pos_by_key,
1605            &chunk_index,
1606        )
1607        .await?;
1608
1609        let mut local = vec![0usize; intersection.len()];
1610        let intersection_shape: Vec<usize> = intersection
1611            .iter()
1612            .map(|r| r.end.saturating_sub(r.start))
1613            .collect();
1614        loop {
1615            let mut global = Vec::with_capacity(intersection.len());
1616            for dim in 0..intersection.len() {
1617                global.push(intersection[dim].start + local[dim]);
1618            }
1619            let rhs_index: Vec<usize> = global
1620                .iter()
1621                .enumerate()
1622                .map(|(dim, g)| g.saturating_sub(ranges[dim].start))
1623                .collect();
1624            let chunk_index_local: Vec<usize> = global
1625                .iter()
1626                .enumerate()
1627                .map(|(dim, g)| g.saturating_sub(chunk_start[dim]))
1628                .collect();
1629            let rhs_linear = linear_index_column_major(&rhs_index, &rhs_shape)?;
1630            let chunk_linear = linear_index_column_major(&chunk_index_local, &chunk_extent)?;
1631            chunk_payload.values[chunk_linear] = rhs_values[rhs_linear];
1632            if !advance_index(&mut local, &intersection_shape) {
1633                break;
1634            }
1635        }
1636
1637        let chunk_bytes = serde_json::to_vec(&chunk_payload)
1638            .map_err(|err| data_error(format!("failed to encode chunk payload: {err}")))?;
1639        let chunk_path = root.join(&entry.data_path);
1640        runmat_filesystem::write_async(&chunk_path, &chunk_bytes)
1641            .await
1642            .map_err(|err| {
1643                data_error(format!(
1644                    "failed to write chunk payload '{}': {err}",
1645                    chunk_path.display()
1646                ))
1647            })?;
1648
1649        entry.coords = coords.clone();
1650        entry.shape = chunk_extent.clone();
1651        entry.bytes_raw = chunk_bytes.len() as u64;
1652        entry.bytes_stored = chunk_bytes.len() as u64;
1653        entry.hash = sha256_hex(&chunk_bytes);
1654        if existed {
1655            chunk_index.chunks[entry_index] = entry.clone();
1656        } else {
1657            chunk_index.chunks.push(entry.clone());
1658            pos_by_key.insert(key.clone(), chunk_index.chunks.len() - 1);
1659        }
1660        upload_batch.push((
1661            DataChunkDescriptor {
1662                key: key.clone(),
1663                object_id: entry.object_id.clone(),
1664                hash: entry.hash.clone(),
1665                bytes_raw: entry.bytes_raw,
1666                bytes_stored: entry.bytes_stored,
1667            },
1668            chunk_bytes,
1669        ));
1670    }
1671
1672    maybe_upload_chunk_batch_async(root, array_name, upload_batch).await?;
1673    tracing::info!(
1674        target: "runmat.data",
1675        dataset = %root.display(),
1676        array = array_name,
1677        touched_chunks = chunk_index.chunks.len(),
1678        "chunked slice write committed"
1679    );
1680    let index_write = serde_json::to_vec(&chunk_index)
1681        .map_err(|err| data_error(format!("failed to encode chunk index json: {err}")))?;
1682    runmat_filesystem::write_async(&index_path, &index_write)
1683        .await
1684        .map_err(|err| {
1685            data_error(format!(
1686                "failed to write chunk index '{}': {err}",
1687                index_path.display()
1688            ))
1689        })?;
1690
1691    if let Some(updated) = manifest.arrays.get_mut(array_name) {
1692        updated.shape = meta.shape.clone();
1693        updated.chunk_index_path = Some(index_rel_path.clone());
1694    }
1695    Ok(true)
1696}
1697
1698fn value_to_tensor_shape_values(value: &Value) -> BuiltinResult<(Vec<usize>, Vec<f64>)> {
1699    match value {
1700        Value::Tensor(t) => Ok((t.shape.clone(), t.data.clone())),
1701        Value::Num(n) => Ok((vec![1, 1], vec![*n])),
1702        Value::Int(i) => Ok((vec![1, 1], vec![i.to_i64() as f64])),
1703        _ => Err(data_error(
1704            "DataArray.write supports tensor or numeric scalar values",
1705        )),
1706    }
1707}
1708
1709#[derive(Clone, Copy, Debug)]
1710struct DimRange {
1711    start: usize,
1712    end: usize,
1713}
1714
1715fn read_slice_payload(
1716    payload: &DataArrayPayload,
1717    slice_spec: &Value,
1718) -> BuiltinResult<DataArrayPayload> {
1719    let ranges = parse_slice_spec(slice_spec, &payload.shape)?;
1720    let out_shape: Vec<usize> = ranges
1721        .iter()
1722        .map(|r| r.end.saturating_sub(r.start))
1723        .collect();
1724    let mut out_values = Vec::new();
1725    let mut out_index = vec![0usize; out_shape.len()];
1726    loop {
1727        let source_index: Vec<usize> = out_index
1728            .iter()
1729            .enumerate()
1730            .map(|(dim, idx)| ranges[dim].start + *idx)
1731            .collect();
1732        let linear = linear_index_column_major(&source_index, &payload.shape)?;
1733        out_values.push(payload.values[linear]);
1734
1735        if !advance_index(&mut out_index, &out_shape) {
1736            break;
1737        }
1738    }
1739    Ok(DataArrayPayload {
1740        dtype: payload.dtype.clone(),
1741        shape: out_shape,
1742        values: out_values,
1743    })
1744}
1745
1746fn write_slice_payload(
1747    payload: &DataArrayPayload,
1748    slice_spec: &Value,
1749    rhs: &Value,
1750) -> BuiltinResult<DataArrayPayload> {
1751    let ranges = parse_slice_spec(slice_spec, &payload.shape)?;
1752    let target_shape: Vec<usize> = ranges
1753        .iter()
1754        .map(|r| r.end.saturating_sub(r.start))
1755        .collect();
1756    let (rhs_shape, rhs_values) = value_to_tensor_shape_values(rhs)?;
1757    if rhs_shape != target_shape {
1758        return Err(data_error(format!(
1759            "SHAPE_MISMATCH: rhs shape {:?} must match target slice shape {:?}",
1760            rhs_shape, target_shape
1761        )));
1762    }
1763
1764    let mut next = payload.values.clone();
1765    let mut rhs_index = vec![0usize; target_shape.len()];
1766    let mut rhs_linear = 0usize;
1767    loop {
1768        let target_index: Vec<usize> = rhs_index
1769            .iter()
1770            .enumerate()
1771            .map(|(dim, idx)| ranges[dim].start + *idx)
1772            .collect();
1773        let target_linear = linear_index_column_major(&target_index, &payload.shape)?;
1774        next[target_linear] = rhs_values[rhs_linear];
1775        rhs_linear += 1;
1776
1777        if !advance_index(&mut rhs_index, &target_shape) {
1778            break;
1779        }
1780    }
1781
1782    Ok(DataArrayPayload {
1783        dtype: payload.dtype.clone(),
1784        shape: payload.shape.clone(),
1785        values: next,
1786    })
1787}
1788
1789fn parse_slice_spec(slice_spec: &Value, shape: &[usize]) -> BuiltinResult<Vec<DimRange>> {
1790    match slice_spec {
1791        Value::Cell(cell) => {
1792            if cell.data.is_empty() {
1793                return Err(data_error("INVALID_SLICE: empty slice specification"));
1794            }
1795            let mut ranges = Vec::with_capacity(shape.len());
1796            for (dim, extent) in shape.iter().enumerate() {
1797                if let Some(item) = cell.data.get(dim).map(|v| &**v) {
1798                    ranges.push(parse_dim_range(item, *extent)?);
1799                } else {
1800                    ranges.push(DimRange {
1801                        start: 0,
1802                        end: *extent,
1803                    });
1804                }
1805            }
1806            Ok(ranges)
1807        }
1808        Value::String(s) if s == ":" => Ok(shape
1809            .iter()
1810            .map(|extent| DimRange {
1811                start: 0,
1812                end: *extent,
1813            })
1814            .collect()),
1815        _ => Err(data_error(
1816            "INVALID_SLICE: slice must be a cell spec like {1:10, :} or ':'",
1817        )),
1818    }
1819}
1820
1821fn parse_dim_range(value: &Value, extent: usize) -> BuiltinResult<DimRange> {
1822    if extent == 0 {
1823        return Ok(DimRange { start: 0, end: 0 });
1824    }
1825    match value {
1826        Value::String(s) if s == ":" => Ok(DimRange {
1827            start: 0,
1828            end: extent,
1829        }),
1830        Value::Num(n) => {
1831            let idx = (*n as isize) - 1;
1832            if idx < 0 || idx as usize >= extent {
1833                return Err(data_error("INVALID_SLICE: index out of bounds"));
1834            }
1835            Ok(DimRange {
1836                start: idx as usize,
1837                end: idx as usize + 1,
1838            })
1839        }
1840        Value::Int(i) => {
1841            let idx = i.to_i64() - 1;
1842            if idx < 0 || idx as usize >= extent {
1843                return Err(data_error("INVALID_SLICE: index out of bounds"));
1844            }
1845            Ok(DimRange {
1846                start: idx as usize,
1847                end: idx as usize + 1,
1848            })
1849        }
1850        Value::Tensor(t) if t.data.len() == 2 => {
1851            let start = (t.data[0] as isize) - 1;
1852            let end_inclusive = (t.data[1] as isize) - 1;
1853            if start < 0 || end_inclusive < start || end_inclusive as usize >= extent {
1854                return Err(data_error("INVALID_SLICE: range out of bounds"));
1855            }
1856            Ok(DimRange {
1857                start: start as usize,
1858                end: end_inclusive as usize + 1,
1859            })
1860        }
1861        _ => Err(data_error(
1862            "INVALID_SLICE: dimension must be ':', scalar index, or [start end] range",
1863        )),
1864    }
1865}
1866
1867fn linear_index_column_major(index: &[usize], shape: &[usize]) -> BuiltinResult<usize> {
1868    if index.len() != shape.len() {
1869        return Err(data_error("INVALID_SLICE: rank mismatch"));
1870    }
1871    let mut stride = 1usize;
1872    let mut linear = 0usize;
1873    for (idx, extent) in index.iter().zip(shape.iter()) {
1874        if *idx >= *extent {
1875            return Err(data_error("INVALID_SLICE: index out of bounds"));
1876        }
1877        linear += idx * stride;
1878        stride = stride.saturating_mul(*extent);
1879    }
1880    Ok(linear)
1881}
1882
1883fn advance_index(index: &mut [usize], shape: &[usize]) -> bool {
1884    if shape.is_empty() {
1885        return false;
1886    }
1887    for dim in 0..shape.len() {
1888        index[dim] += 1;
1889        if index[dim] < shape[dim] {
1890            return true;
1891        }
1892        index[dim] = 0;
1893    }
1894    false
1895}
1896
1897fn chunk_key(coords: &[usize]) -> String {
1898    coords
1899        .iter()
1900        .map(|v| v.to_string())
1901        .collect::<Vec<_>>()
1902        .join(".")
1903}
1904
1905fn chunk_start_for_coords(coords: &[usize], chunk_shape: &[usize]) -> Vec<usize> {
1906    coords
1907        .iter()
1908        .enumerate()
1909        .map(|(dim, coord)| coord * chunk_shape.get(dim).copied().unwrap_or(1).max(1))
1910        .collect()
1911}
1912
1913fn chunk_extent_for_start(start: &[usize], chunk_shape: &[usize], shape: &[usize]) -> Vec<usize> {
1914    start
1915        .iter()
1916        .enumerate()
1917        .map(|(dim, start)| {
1918            let chunk = chunk_shape.get(dim).copied().unwrap_or(1).max(1);
1919            let end = (*start + chunk).min(shape[dim]);
1920            end.saturating_sub(*start)
1921        })
1922        .collect()
1923}
1924
1925fn chunk_intersection(
1926    ranges: &[DimRange],
1927    chunk_start: &[usize],
1928    chunk_extent: &[usize],
1929) -> Vec<DimRange> {
1930    let mut out = Vec::with_capacity(ranges.len());
1931    for dim in 0..ranges.len() {
1932        let c_start = chunk_start[dim];
1933        let c_end = c_start + chunk_extent[dim];
1934        let start = ranges[dim].start.max(c_start);
1935        let end = ranges[dim].end.min(c_end);
1936        if start >= end {
1937            return Vec::new();
1938        }
1939        out.push(DimRange { start, end });
1940    }
1941    out
1942}
1943
1944fn touched_chunk_coords(
1945    ranges: &[DimRange],
1946    chunk_shape: &[usize],
1947    shape: &[usize],
1948) -> Vec<Vec<usize>> {
1949    let mut span = Vec::with_capacity(ranges.len());
1950    let mut begin = Vec::with_capacity(ranges.len());
1951    for dim in 0..ranges.len() {
1952        if shape[dim] == 0 {
1953            return Vec::new();
1954        }
1955        let chunk = chunk_shape.get(dim).copied().unwrap_or(1).max(1);
1956        let first = ranges[dim].start / chunk;
1957        let last = (ranges[dim].end.saturating_sub(1)) / chunk;
1958        begin.push(first);
1959        span.push(last.saturating_sub(first) + 1);
1960    }
1961    let mut local = vec![0usize; span.len()];
1962    let mut out = Vec::new();
1963    loop {
1964        out.push(
1965            local
1966                .iter()
1967                .enumerate()
1968                .map(|(dim, v)| begin[dim] + *v)
1969                .collect::<Vec<_>>(),
1970        );
1971        if !advance_index(&mut local, &span) {
1972            break;
1973        }
1974    }
1975    out
1976}
1977
1978async fn maybe_upload_chunk_batch_async(
1979    root: &std::path::Path,
1980    array_name: &str,
1981    batch: Vec<(DataChunkDescriptor, Vec<u8>)>,
1982) -> BuiltinResult<()> {
1983    if batch.is_empty() {
1984        return Ok(());
1985    }
1986    let request = DataChunkUploadRequest {
1987        dataset_path: root.to_string_lossy().to_string(),
1988        array: array_name.to_string(),
1989        chunks: batch.iter().map(|(d, _)| d.clone()).collect(),
1990    };
1991    let targets = match runmat_filesystem::data_chunk_upload_targets_async(&request).await {
1992        Ok(targets) => targets,
1993        Err(err) if err.kind() == std::io::ErrorKind::Unsupported => return Ok(()),
1994        Err(err) => {
1995            return Err(data_error(format!(
1996                "failed to request data chunk upload targets: {err}"
1997            )))
1998        }
1999    };
2000    for (descriptor, bytes) in batch {
2001        let target = targets
2002            .iter()
2003            .find(|t| t.key == descriptor.key)
2004            .ok_or_else(|| {
2005                data_error(format!(
2006                    "missing upload target for chunk '{}'",
2007                    descriptor.key
2008                ))
2009            })?;
2010        runmat_filesystem::data_upload_chunk_async(target, &bytes)
2011            .await
2012            .map_err(|err| {
2013                data_error(format!(
2014                    "failed to upload chunk '{}': {err}",
2015                    descriptor.key
2016                ))
2017            })?;
2018        tracing::info!(
2019            target: "runmat.data",
2020            dataset = %root.display(),
2021            array = array_name,
2022            chunk_key = descriptor.key,
2023            bytes = bytes.len(),
2024            "chunk upload completed"
2025        );
2026    }
2027    Ok(())
2028}
2029
2030fn chunk_rel_path(array_name: &str, object_id: &str) -> String {
2031    format!("arrays/{array_name}/chunks/{object_id}.json")
2032}
2033
2034async fn load_or_init_chunk(
2035    root: &std::path::Path,
2036    array_name: &str,
2037    key: &str,
2038    coords: &[usize],
2039    chunk_extent: &[usize],
2040    pos_by_key: &HashMap<String, usize>,
2041    chunk_index: &DataChunkIndex,
2042) -> BuiltinResult<(usize, bool, DataChunkIndexEntry, DataArrayPayload)> {
2043    if let Some(index) = pos_by_key.get(key).copied() {
2044        let entry = chunk_index
2045            .chunks
2046            .get(index)
2047            .cloned()
2048            .ok_or_else(|| data_error(format!("chunk index missing key '{key}'")))?;
2049        let bytes = runmat_filesystem::read_async(root.join(&entry.data_path))
2050            .await
2051            .map_err(|err| {
2052                data_error(format!(
2053                    "failed to read chunk payload '{}': {err}",
2054                    entry.data_path
2055                ))
2056            })?;
2057        let payload: DataArrayPayload = serde_json::from_slice(&bytes).map_err(|err| {
2058            data_error(format!(
2059                "failed to parse chunk payload '{}': {err}",
2060                entry.data_path
2061            ))
2062        })?;
2063        return Ok((index, true, entry, payload));
2064    }
2065
2066    let object_id = format!("obj_{}", key.replace('.', "_"));
2067    let entry = DataChunkIndexEntry {
2068        key: key.to_string(),
2069        object_id: object_id.clone(),
2070        hash: String::new(),
2071        bytes_raw: 0,
2072        bytes_stored: 0,
2073        coords: coords.to_vec(),
2074        shape: chunk_extent.to_vec(),
2075        data_path: chunk_rel_path(array_name, &object_id),
2076    };
2077    let payload = DataArrayPayload {
2078        dtype: "f64".to_string(),
2079        shape: chunk_extent.to_vec(),
2080        values: vec![0.0; chunk_extent.iter().copied().product()],
2081    };
2082    Ok((chunk_index.chunks.len(), false, entry, payload))
2083}
2084
2085fn attrs_to_struct(attrs: &BTreeMap<String, serde_json::Value>) -> Value {
2086    let mut out = StructValue::new();
2087    for (k, v) in attrs {
2088        out.fields.insert(k.clone(), json_to_value(v));
2089    }
2090    Value::Struct(out)
2091}
2092
2093fn value_to_json(value: &Value) -> serde_json::Value {
2094    match value {
2095        Value::String(s) => serde_json::Value::String(s.clone()),
2096        Value::CharArray(ca) => serde_json::Value::String(ca.to_string()),
2097        Value::Num(n) => serde_json::json!(n),
2098        Value::Int(i) => serde_json::json!(i.to_i64()),
2099        Value::Bool(b) => serde_json::json!(b),
2100        _ => serde_json::Value::String(format!("{value:?}")),
2101    }
2102}
2103
2104fn json_to_value(value: &serde_json::Value) -> Value {
2105    match value {
2106        serde_json::Value::Bool(b) => Value::Bool(*b),
2107        serde_json::Value::Number(n) => Value::Num(n.as_f64().unwrap_or_default()),
2108        serde_json::Value::String(s) => Value::String(s.clone()),
2109        serde_json::Value::Array(arr) => {
2110            let vals = arr.iter().map(json_to_value).collect::<Vec<_>>();
2111            crate::make_cell(vals.clone(), 1, vals.len())
2112                .unwrap_or_else(|_| Value::String("<invalid-array>".to_string()))
2113        }
2114        serde_json::Value::Object(map) => {
2115            let mut s = StructValue::new();
2116            for (k, v) in map {
2117                s.fields.insert(k.clone(), json_to_value(v));
2118            }
2119            Value::Struct(s)
2120        }
2121        serde_json::Value::Null => Value::String("".to_string()),
2122    }
2123}
2124
2125#[cfg(test)]
2126mod tests {
2127    use super::*;
2128    use crate::dispatcher::call_builtin;
2129    use async_trait::async_trait;
2130    use axum::extract::{Query, State};
2131    use axum::http::{HeaderMap, StatusCode};
2132    use axum::routing::{post, put};
2133    use axum::{Json, Router};
2134    use runmat_builtins::CellArray;
2135    use runmat_filesystem::data_contract::{
2136        DataChunkUploadRequest, DataChunkUploadTarget, DataManifestDescriptor, DataManifestRequest,
2137    };
2138    use runmat_filesystem::{
2139        DirEntry, FileHandle, FsMetadata, FsProvider, NativeFsProvider, OpenFlags,
2140    };
2141    use serde::Deserialize;
2142    use std::path::Path;
2143    use std::sync::{Arc, Mutex, MutexGuard, OnceLock};
2144    use tokio::runtime::Runtime;
2145    use tokio::sync::oneshot;
2146
2147    fn serial_test_guard() -> MutexGuard<'static, ()> {
2148        static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
2149        LOCK.get_or_init(|| Mutex::new(()))
2150            .lock()
2151            .expect("data test serial lock poisoned")
2152    }
2153
2154    #[derive(Default)]
2155    struct CountingDataUploadProvider {
2156        inner: NativeFsProvider,
2157        uploaded_keys: Arc<Mutex<Vec<String>>>,
2158    }
2159
2160    struct HttpDataUploadProvider {
2161        inner: NativeFsProvider,
2162        base_url: String,
2163        client: reqwest::blocking::Client,
2164    }
2165
2166    impl HttpDataUploadProvider {
2167        fn new(base_url: String) -> Self {
2168            Self {
2169                inner: NativeFsProvider,
2170                base_url,
2171                client: reqwest::blocking::Client::new(),
2172            }
2173        }
2174    }
2175
2176    #[async_trait(?Send)]
2177    impl FsProvider for HttpDataUploadProvider {
2178        fn open(&self, path: &Path, flags: &OpenFlags) -> std::io::Result<Box<dyn FileHandle>> {
2179            self.inner.open(path, flags)
2180        }
2181
2182        async fn read(&self, path: &Path) -> std::io::Result<Vec<u8>> {
2183            self.inner.read(path).await
2184        }
2185
2186        async fn write(&self, path: &Path, data: &[u8]) -> std::io::Result<()> {
2187            self.inner.write(path, data).await
2188        }
2189
2190        async fn remove_file(&self, path: &Path) -> std::io::Result<()> {
2191            self.inner.remove_file(path).await
2192        }
2193
2194        async fn metadata(&self, path: &Path) -> std::io::Result<FsMetadata> {
2195            self.inner.metadata(path).await
2196        }
2197
2198        async fn symlink_metadata(&self, path: &Path) -> std::io::Result<FsMetadata> {
2199            self.inner.symlink_metadata(path).await
2200        }
2201
2202        async fn read_dir(&self, path: &Path) -> std::io::Result<Vec<DirEntry>> {
2203            self.inner.read_dir(path).await
2204        }
2205
2206        async fn canonicalize(&self, path: &Path) -> std::io::Result<std::path::PathBuf> {
2207            self.inner.canonicalize(path).await
2208        }
2209
2210        async fn create_dir(&self, path: &Path) -> std::io::Result<()> {
2211            self.inner.create_dir(path).await
2212        }
2213
2214        async fn create_dir_all(&self, path: &Path) -> std::io::Result<()> {
2215            self.inner.create_dir_all(path).await
2216        }
2217
2218        async fn remove_dir(&self, path: &Path) -> std::io::Result<()> {
2219            self.inner.remove_dir(path).await
2220        }
2221
2222        async fn remove_dir_all(&self, path: &Path) -> std::io::Result<()> {
2223            self.inner.remove_dir_all(path).await
2224        }
2225
2226        async fn rename(&self, from: &Path, to: &Path) -> std::io::Result<()> {
2227            self.inner.rename(from, to).await
2228        }
2229
2230        async fn set_readonly(&self, path: &Path, readonly: bool) -> std::io::Result<()> {
2231            self.inner.set_readonly(path, readonly).await
2232        }
2233
2234        async fn data_manifest_descriptor(
2235            &self,
2236            request: &DataManifestRequest,
2237        ) -> std::io::Result<DataManifestDescriptor> {
2238            self.inner.data_manifest_descriptor(request).await
2239        }
2240
2241        async fn data_chunk_upload_targets(
2242            &self,
2243            request: &DataChunkUploadRequest,
2244        ) -> std::io::Result<Vec<DataChunkUploadTarget>> {
2245            #[derive(Deserialize)]
2246            struct UploadTargetsResponse {
2247                targets: Vec<DataChunkUploadTarget>,
2248            }
2249            let url = format!("{}/data/chunks/upload-targets", self.base_url);
2250            let response = self
2251                .client
2252                .post(url)
2253                .json(request)
2254                .send()
2255                .map_err(|err| std::io::Error::other(err.to_string()))?;
2256            if !response.status().is_success() {
2257                return Err(std::io::Error::other(format!(
2258                    "upload targets request failed: {}",
2259                    response.status()
2260                )));
2261            }
2262            let parsed: UploadTargetsResponse = response
2263                .json()
2264                .map_err(|err| std::io::Error::other(err.to_string()))?;
2265            Ok(parsed.targets)
2266        }
2267
2268        async fn data_upload_chunk(
2269            &self,
2270            target: &DataChunkUploadTarget,
2271            data: &[u8],
2272        ) -> std::io::Result<()> {
2273            let upload_url = if let Some(key) = target.upload_url.strip_prefix("upload://") {
2274                format!("{}/upload?key={}", self.base_url, key)
2275            } else {
2276                target.upload_url.clone()
2277            };
2278            let method = reqwest::Method::from_bytes(target.method.as_bytes())
2279                .map_err(|err| std::io::Error::other(err.to_string()))?;
2280            let mut request = self.client.request(method, &upload_url);
2281            for (k, v) in &target.headers {
2282                request = request.header(k, v);
2283            }
2284            let response = request
2285                .body(data.to_vec())
2286                .send()
2287                .map_err(|err| std::io::Error::other(err.to_string()))?;
2288            if !response.status().is_success() {
2289                return Err(std::io::Error::other(format!(
2290                    "chunk upload failed: {}",
2291                    response.status()
2292                )));
2293            }
2294            Ok(())
2295        }
2296    }
2297
2298    #[derive(Clone, Default)]
2299    struct UploadHarness {
2300        uploads: Arc<Mutex<Vec<String>>>,
2301    }
2302
2303    #[derive(Deserialize)]
2304    struct UploadChunkQuery {
2305        key: String,
2306    }
2307
2308    async fn upload_targets_handler(
2309        Json(req): Json<DataChunkUploadRequest>,
2310    ) -> Result<Json<serde_json::Value>, StatusCode> {
2311        let targets = req
2312            .chunks
2313            .iter()
2314            .map(|chunk| {
2315                serde_json::json!({
2316                    "key": chunk.key,
2317                    "method": "PUT",
2318                    "upload_url": format!("upload://{}", chunk.key),
2319                    "headers": {
2320                        "x-runmat-hash": chunk.hash,
2321                    }
2322                })
2323            })
2324            .collect::<Vec<_>>();
2325        Ok(Json(serde_json::json!({ "targets": targets })))
2326    }
2327
2328    async fn upload_handler(
2329        State(harness): State<UploadHarness>,
2330        Query(query): Query<UploadChunkQuery>,
2331        headers: HeaderMap,
2332        body: axum::body::Bytes,
2333    ) -> Result<(), StatusCode> {
2334        if body.is_empty() {
2335            return Err(StatusCode::BAD_REQUEST);
2336        }
2337        if headers.get("x-runmat-hash").is_none() {
2338            return Err(StatusCode::BAD_REQUEST);
2339        }
2340        let mut guard = harness.uploads.lock().expect("uploads lock poisoned");
2341        guard.push(query.key);
2342        Ok(())
2343    }
2344
2345    fn spawn_upload_server() -> (
2346        String,
2347        Arc<Mutex<Vec<String>>>,
2348        Runtime,
2349        oneshot::Sender<()>,
2350    ) {
2351        let harness = UploadHarness::default();
2352        let uploads = Arc::clone(&harness.uploads);
2353        let runtime = Runtime::new().expect("tokio runtime");
2354        let (addr, shutdown_tx) = runtime.block_on(async move {
2355            let listener = tokio::net::TcpListener::bind((std::net::Ipv4Addr::LOCALHOST, 0))
2356                .await
2357                .expect("bind upload server");
2358            let addr = listener.local_addr().expect("local addr");
2359            let app = Router::new()
2360                .route("/data/chunks/upload-targets", post(upload_targets_handler))
2361                .route("/upload", put(upload_handler))
2362                .with_state(harness);
2363            let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
2364            let server = axum::serve(listener, app).with_graceful_shutdown(async {
2365                let _ = shutdown_rx.await;
2366            });
2367            tokio::spawn(async move {
2368                let _ = server.await;
2369            });
2370            (addr, shutdown_tx)
2371        });
2372        (format!("http://{}", addr), uploads, runtime, shutdown_tx)
2373    }
2374
2375    impl CountingDataUploadProvider {
2376        fn uploaded_keys(&self) -> Arc<Mutex<Vec<String>>> {
2377            Arc::clone(&self.uploaded_keys)
2378        }
2379    }
2380
2381    #[async_trait(?Send)]
2382    impl FsProvider for CountingDataUploadProvider {
2383        fn open(&self, path: &Path, flags: &OpenFlags) -> std::io::Result<Box<dyn FileHandle>> {
2384            self.inner.open(path, flags)
2385        }
2386
2387        async fn read(&self, path: &Path) -> std::io::Result<Vec<u8>> {
2388            self.inner.read(path).await
2389        }
2390
2391        async fn write(&self, path: &Path, data: &[u8]) -> std::io::Result<()> {
2392            self.inner.write(path, data).await
2393        }
2394
2395        async fn remove_file(&self, path: &Path) -> std::io::Result<()> {
2396            self.inner.remove_file(path).await
2397        }
2398
2399        async fn metadata(&self, path: &Path) -> std::io::Result<FsMetadata> {
2400            self.inner.metadata(path).await
2401        }
2402
2403        async fn symlink_metadata(&self, path: &Path) -> std::io::Result<FsMetadata> {
2404            self.inner.symlink_metadata(path).await
2405        }
2406
2407        async fn read_dir(&self, path: &Path) -> std::io::Result<Vec<DirEntry>> {
2408            self.inner.read_dir(path).await
2409        }
2410
2411        async fn canonicalize(&self, path: &Path) -> std::io::Result<std::path::PathBuf> {
2412            self.inner.canonicalize(path).await
2413        }
2414
2415        async fn create_dir(&self, path: &Path) -> std::io::Result<()> {
2416            self.inner.create_dir(path).await
2417        }
2418
2419        async fn create_dir_all(&self, path: &Path) -> std::io::Result<()> {
2420            self.inner.create_dir_all(path).await
2421        }
2422
2423        async fn remove_dir(&self, path: &Path) -> std::io::Result<()> {
2424            self.inner.remove_dir(path).await
2425        }
2426
2427        async fn remove_dir_all(&self, path: &Path) -> std::io::Result<()> {
2428            self.inner.remove_dir_all(path).await
2429        }
2430
2431        async fn rename(&self, from: &Path, to: &Path) -> std::io::Result<()> {
2432            self.inner.rename(from, to).await
2433        }
2434
2435        async fn set_readonly(&self, path: &Path, readonly: bool) -> std::io::Result<()> {
2436            self.inner.set_readonly(path, readonly).await
2437        }
2438
2439        async fn data_manifest_descriptor(
2440            &self,
2441            request: &DataManifestRequest,
2442        ) -> std::io::Result<DataManifestDescriptor> {
2443            self.inner.data_manifest_descriptor(request).await
2444        }
2445
2446        async fn data_chunk_upload_targets(
2447            &self,
2448            request: &DataChunkUploadRequest,
2449        ) -> std::io::Result<Vec<DataChunkUploadTarget>> {
2450            Ok(request
2451                .chunks
2452                .iter()
2453                .map(|chunk| DataChunkUploadTarget {
2454                    key: chunk.key.clone(),
2455                    method: "PUT".to_string(),
2456                    upload_url: format!("count://{}", chunk.object_id),
2457                    headers: std::collections::HashMap::new(),
2458                })
2459                .collect())
2460        }
2461
2462        async fn data_upload_chunk(
2463            &self,
2464            target: &DataChunkUploadTarget,
2465            _data: &[u8],
2466        ) -> std::io::Result<()> {
2467            let mut guard = match self.uploaded_keys.lock() {
2468                Ok(guard) => guard,
2469                Err(poisoned) => poisoned.into_inner(),
2470            };
2471            guard.push(target.key.clone());
2472            Ok(())
2473        }
2474    }
2475
2476    #[test]
2477    fn create_open_write_read_dataset() {
2478        let _serial = serial_test_guard();
2479        let dir = tempfile::tempdir().expect("tempdir");
2480        let path = dir.path().join("sample.data").to_string_lossy().to_string();
2481
2482        let mut array_meta = StructValue::new();
2483        array_meta
2484            .fields
2485            .insert("dtype".to_string(), Value::String("f64".to_string()));
2486        array_meta.fields.insert(
2487            "shape".to_string(),
2488            Value::Tensor(Tensor::new(vec![2.0, 2.0], vec![1, 2]).expect("shape tensor")),
2489        );
2490        let mut arrays = StructValue::new();
2491        arrays
2492            .fields
2493            .insert("temperature".to_string(), Value::Struct(array_meta));
2494        let mut schema = StructValue::new();
2495        schema
2496            .fields
2497            .insert("arrays".to_string(), Value::Struct(arrays));
2498
2499        let ds = call_builtin(
2500            "data.create",
2501            &[
2502                Value::String(path.clone()),
2503                Value::Struct(schema),
2504                Value::Cell(runmat_builtins::CellArray::new(vec![], 1, 0).expect("cell")),
2505            ],
2506        )
2507        .expect("create dataset");
2508
2509        let arr = call_builtin(
2510            "Dataset.array",
2511            &[ds, Value::String("temperature".to_string())],
2512        )
2513        .expect("dataset array");
2514        let write_tensor = Tensor::new(vec![1.0, 2.0, 3.0, 4.0], vec![2, 2]).expect("write tensor");
2515        call_builtin(
2516            "DataArray.write",
2517            &[arr.clone(), Value::Tensor(write_tensor)],
2518        )
2519        .expect("write array");
2520
2521        let read_back = call_builtin("DataArray.read", &[arr]).expect("read array");
2522        let Value::Tensor(t) = read_back else {
2523            panic!("expected tensor");
2524        };
2525        assert_eq!(t.shape, vec![2, 2]);
2526        assert_eq!(t.data, vec![1.0, 2.0, 3.0, 4.0]);
2527    }
2528
2529    #[test]
2530    fn write_and_read_slice_payload() {
2531        let _serial = serial_test_guard();
2532        let dir = tempfile::tempdir().expect("tempdir");
2533        let path = dir.path().join("slice.data").to_string_lossy().to_string();
2534
2535        let mut array_meta = StructValue::new();
2536        array_meta
2537            .fields
2538            .insert("dtype".to_string(), Value::String("f64".to_string()));
2539        array_meta.fields.insert(
2540            "shape".to_string(),
2541            Value::Tensor(Tensor::new(vec![3.0, 3.0], vec![1, 2]).expect("shape tensor")),
2542        );
2543        let mut arrays = StructValue::new();
2544        arrays
2545            .fields
2546            .insert("temperature".to_string(), Value::Struct(array_meta));
2547        let mut schema = StructValue::new();
2548        schema
2549            .fields
2550            .insert("arrays".to_string(), Value::Struct(arrays));
2551
2552        let ds = call_builtin(
2553            "data.create",
2554            &[
2555                Value::String(path.clone()),
2556                Value::Struct(schema),
2557                Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
2558            ],
2559        )
2560        .expect("create dataset");
2561        let arr = call_builtin(
2562            "Dataset.array",
2563            &[ds, Value::String("temperature".to_string())],
2564        )
2565        .expect("dataset array");
2566
2567        let slice = Value::Cell(
2568            CellArray::new(
2569                vec![
2570                    Value::Tensor(Tensor::new(vec![1.0, 2.0], vec![1, 2]).expect("range")),
2571                    Value::String(":".to_string()),
2572                ],
2573                1,
2574                2,
2575            )
2576            .expect("slice cell"),
2577        );
2578        let rhs = Value::Tensor(
2579            Tensor::new(vec![10.0, 11.0, 12.0, 13.0, 14.0, 15.0], vec![2, 3]).expect("rhs"),
2580        );
2581        call_builtin("DataArray.write", &[arr.clone(), slice.clone(), rhs]).expect("slice write");
2582
2583        let read_back = call_builtin("DataArray.read", &[arr.clone(), slice]).expect("slice read");
2584        let Value::Tensor(t) = read_back else {
2585            panic!("expected tensor");
2586        };
2587        assert_eq!(t.shape, vec![2, 3]);
2588        assert_eq!(t.data, vec![10.0, 11.0, 12.0, 13.0, 14.0, 15.0]);
2589    }
2590
2591    #[test]
2592    fn slice_write_updates_only_touched_chunks() {
2593        let _serial = serial_test_guard();
2594        let dir = tempfile::tempdir().expect("tempdir");
2595        let path = dir
2596            .path()
2597            .join("chunked.data")
2598            .to_string_lossy()
2599            .to_string();
2600
2601        let mut array_meta = StructValue::new();
2602        array_meta
2603            .fields
2604            .insert("dtype".to_string(), Value::String("f64".to_string()));
2605        array_meta.fields.insert(
2606            "shape".to_string(),
2607            Value::Tensor(Tensor::new(vec![4.0, 4.0], vec![1, 2]).expect("shape tensor")),
2608        );
2609        array_meta.fields.insert(
2610            "chunk".to_string(),
2611            Value::Tensor(Tensor::new(vec![2.0, 2.0], vec![1, 2]).expect("chunk tensor")),
2612        );
2613        let mut arrays = StructValue::new();
2614        arrays
2615            .fields
2616            .insert("temperature".to_string(), Value::Struct(array_meta));
2617        let mut schema = StructValue::new();
2618        schema
2619            .fields
2620            .insert("arrays".to_string(), Value::Struct(arrays));
2621
2622        let ds = call_builtin(
2623            "data.create",
2624            &[
2625                Value::String(path.clone()),
2626                Value::Struct(schema),
2627                Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
2628            ],
2629        )
2630        .expect("create dataset");
2631        let arr = call_builtin(
2632            "Dataset.array",
2633            &[ds, Value::String("temperature".to_string())],
2634        )
2635        .expect("dataset array");
2636
2637        let full = Value::Tensor(
2638            Tensor::new((1..=16).map(|v| v as f64).collect(), vec![4, 4]).expect("full tensor"),
2639        );
2640        call_builtin("DataArray.write", &[arr.clone(), full]).expect("initial write");
2641
2642        let root = std::path::PathBuf::from(&path);
2643        let untouched_path = root.join("arrays/temperature/chunks/obj_1_1.json");
2644        let touched_path = root.join("arrays/temperature/chunks/obj_0_0.json");
2645        let untouched_before =
2646            futures::executor::block_on(runmat_filesystem::read_async(&untouched_path))
2647                .expect("read untouched before");
2648        let touched_before =
2649            futures::executor::block_on(runmat_filesystem::read_async(&touched_path))
2650                .expect("read touched before");
2651
2652        let slice = Value::Cell(
2653            CellArray::new(
2654                vec![
2655                    Value::Tensor(Tensor::new(vec![1.0, 2.0], vec![1, 2]).expect("range")),
2656                    Value::Tensor(Tensor::new(vec![1.0, 2.0], vec![1, 2]).expect("range")),
2657                ],
2658                1,
2659                2,
2660            )
2661            .expect("slice cell"),
2662        );
2663        let rhs =
2664            Value::Tensor(Tensor::new(vec![99.0, 98.0, 97.0, 96.0], vec![2, 2]).expect("rhs"));
2665        call_builtin("DataArray.write", &[arr.clone(), slice, rhs]).expect("slice write");
2666
2667        let untouched_after =
2668            futures::executor::block_on(runmat_filesystem::read_async(&untouched_path))
2669                .expect("read untouched after");
2670        let touched_after =
2671            futures::executor::block_on(runmat_filesystem::read_async(&touched_path))
2672                .expect("read touched after");
2673        assert_eq!(untouched_before, untouched_after);
2674        assert_ne!(touched_before, touched_after);
2675    }
2676
2677    #[test]
2678    fn slice_write_uploads_only_touched_chunk_targets() {
2679        let _serial = serial_test_guard();
2680        let provider = Arc::new(CountingDataUploadProvider::default());
2681        let uploaded = provider.uploaded_keys();
2682        let _guard = runmat_filesystem::replace_provider(provider);
2683
2684        let dir = tempfile::tempdir().expect("tempdir");
2685        let path = dir
2686            .path()
2687            .join("remote-chunked.data")
2688            .to_string_lossy()
2689            .to_string();
2690
2691        let mut array_meta = StructValue::new();
2692        array_meta
2693            .fields
2694            .insert("dtype".to_string(), Value::String("f64".to_string()));
2695        array_meta.fields.insert(
2696            "shape".to_string(),
2697            Value::Tensor(Tensor::new(vec![4.0, 4.0], vec![1, 2]).expect("shape tensor")),
2698        );
2699        array_meta.fields.insert(
2700            "chunk".to_string(),
2701            Value::Tensor(Tensor::new(vec![2.0, 2.0], vec![1, 2]).expect("chunk tensor")),
2702        );
2703        let mut arrays = StructValue::new();
2704        arrays
2705            .fields
2706            .insert("temperature".to_string(), Value::Struct(array_meta));
2707        let mut schema = StructValue::new();
2708        schema
2709            .fields
2710            .insert("arrays".to_string(), Value::Struct(arrays));
2711
2712        let ds = call_builtin(
2713            "data.create",
2714            &[
2715                Value::String(path.clone()),
2716                Value::Struct(schema),
2717                Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
2718            ],
2719        )
2720        .expect("create dataset");
2721        let arr = call_builtin(
2722            "Dataset.array",
2723            &[ds, Value::String("temperature".to_string())],
2724        )
2725        .expect("dataset array");
2726
2727        call_builtin(
2728            "DataArray.write",
2729            &[
2730                arr.clone(),
2731                Value::Tensor(
2732                    Tensor::new((1..=16).map(|v| v as f64).collect(), vec![4, 4])
2733                        .expect("full tensor"),
2734                ),
2735            ],
2736        )
2737        .expect("initial write");
2738
2739        let manifest =
2740            futures::executor::block_on(crate::data::read_manifest_async(&dataset_root(&path)))
2741                .expect("manifest after initial write");
2742        let meta = manifest
2743            .arrays
2744            .get("temperature")
2745            .expect("temperature meta");
2746        let chunk_index_path =
2747            dataset_root(&path).join(meta.chunk_index_path.clone().expect("chunk index path"));
2748        assert!(
2749            futures::executor::block_on(runmat_filesystem::metadata_async(&chunk_index_path))
2750                .is_ok()
2751        );
2752
2753        {
2754            let mut keys = uploaded.lock().expect("uploaded keys lock");
2755            keys.clear();
2756        }
2757
2758        let slice = Value::Cell(
2759            CellArray::new(
2760                vec![
2761                    Value::Tensor(Tensor::new(vec![1.0, 2.0], vec![1, 2]).expect("range")),
2762                    Value::Tensor(Tensor::new(vec![1.0, 2.0], vec![1, 2]).expect("range")),
2763                ],
2764                1,
2765                2,
2766            )
2767            .expect("slice cell"),
2768        );
2769        let rhs = Value::Tensor(Tensor::new(vec![9.0, 8.0, 7.0, 6.0], vec![2, 2]).expect("rhs"));
2770        call_builtin("DataArray.write", &[arr, slice, rhs]).expect("slice write");
2771
2772        let keys = uploaded.lock().expect("uploaded keys lock");
2773        assert_eq!(keys.as_slice(), ["0.0".to_string()].as_slice());
2774    }
2775
2776    #[test]
2777    fn slice_write_uploads_expected_cross_boundary_chunk_targets() {
2778        let _serial = serial_test_guard();
2779        let provider = Arc::new(CountingDataUploadProvider::default());
2780        let uploaded = provider.uploaded_keys();
2781        let _guard = runmat_filesystem::replace_provider(provider);
2782
2783        let dir = tempfile::tempdir().expect("tempdir");
2784        let path = dir
2785            .path()
2786            .join("remote-chunked-boundary.data")
2787            .to_string_lossy()
2788            .to_string();
2789
2790        let mut array_meta = StructValue::new();
2791        array_meta
2792            .fields
2793            .insert("dtype".to_string(), Value::String("f64".to_string()));
2794        array_meta.fields.insert(
2795            "shape".to_string(),
2796            Value::Tensor(Tensor::new(vec![4.0, 4.0], vec![1, 2]).expect("shape tensor")),
2797        );
2798        array_meta.fields.insert(
2799            "chunk".to_string(),
2800            Value::Tensor(Tensor::new(vec![2.0, 2.0], vec![1, 2]).expect("chunk tensor")),
2801        );
2802        let mut arrays = StructValue::new();
2803        arrays
2804            .fields
2805            .insert("temperature".to_string(), Value::Struct(array_meta));
2806        let mut schema = StructValue::new();
2807        schema
2808            .fields
2809            .insert("arrays".to_string(), Value::Struct(arrays));
2810
2811        let ds = call_builtin(
2812            "data.create",
2813            &[
2814                Value::String(path.clone()),
2815                Value::Struct(schema),
2816                Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
2817            ],
2818        )
2819        .expect("create dataset");
2820        let arr = call_builtin(
2821            "Dataset.array",
2822            &[ds, Value::String("temperature".to_string())],
2823        )
2824        .expect("dataset array");
2825
2826        call_builtin(
2827            "DataArray.write",
2828            &[
2829                arr.clone(),
2830                Value::Tensor(
2831                    Tensor::new((1..=16).map(|v| v as f64).collect(), vec![4, 4])
2832                        .expect("full tensor"),
2833                ),
2834            ],
2835        )
2836        .expect("initial write");
2837        {
2838            let mut keys = uploaded.lock().expect("uploaded keys lock");
2839            keys.clear();
2840        }
2841
2842        let slice = Value::Cell(
2843            CellArray::new(
2844                vec![
2845                    Value::Tensor(Tensor::new(vec![2.0, 3.0], vec![1, 2]).expect("range")),
2846                    Value::Tensor(Tensor::new(vec![2.0, 3.0], vec![1, 2]).expect("range")),
2847                ],
2848                1,
2849                2,
2850            )
2851            .expect("slice cell"),
2852        );
2853        let rhs =
2854            Value::Tensor(Tensor::new(vec![19.0, 18.0, 17.0, 16.0], vec![2, 2]).expect("rhs"));
2855        call_builtin("DataArray.write", &[arr, slice, rhs]).expect("slice write");
2856
2857        let mut keys = uploaded.lock().expect("uploaded keys lock").clone();
2858        keys.sort();
2859        keys.dedup();
2860        assert_eq!(
2861            keys.as_slice(),
2862            [
2863                "0.0".to_string(),
2864                "0.1".to_string(),
2865                "1.0".to_string(),
2866                "1.1".to_string(),
2867            ]
2868            .as_slice()
2869        );
2870    }
2871
2872    #[test]
2873    fn slice_write_hits_http_server_data_endpoints_with_expected_keys() {
2874        let _serial = serial_test_guard();
2875        let (base_url, uploads, runtime, shutdown_tx) = spawn_upload_server();
2876        let provider = Arc::new(HttpDataUploadProvider::new(base_url));
2877        let _guard = runmat_filesystem::replace_provider(provider);
2878
2879        let dir = tempfile::tempdir().expect("tempdir");
2880        let path = dir
2881            .path()
2882            .join("http-endpoint.data")
2883            .to_string_lossy()
2884            .to_string();
2885
2886        let mut array_meta = StructValue::new();
2887        array_meta
2888            .fields
2889            .insert("dtype".to_string(), Value::String("f64".to_string()));
2890        array_meta.fields.insert(
2891            "shape".to_string(),
2892            Value::Tensor(Tensor::new(vec![4.0, 4.0], vec![1, 2]).expect("shape tensor")),
2893        );
2894        array_meta.fields.insert(
2895            "chunk".to_string(),
2896            Value::Tensor(Tensor::new(vec![2.0, 2.0], vec![1, 2]).expect("chunk tensor")),
2897        );
2898        let mut arrays = StructValue::new();
2899        arrays
2900            .fields
2901            .insert("temperature".to_string(), Value::Struct(array_meta));
2902        let mut schema = StructValue::new();
2903        schema
2904            .fields
2905            .insert("arrays".to_string(), Value::Struct(arrays));
2906
2907        let ds = call_builtin(
2908            "data.create",
2909            &[
2910                Value::String(path.clone()),
2911                Value::Struct(schema),
2912                Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
2913            ],
2914        )
2915        .expect("create dataset");
2916        let arr = call_builtin(
2917            "Dataset.array",
2918            &[ds, Value::String("temperature".to_string())],
2919        )
2920        .expect("dataset array");
2921
2922        call_builtin(
2923            "DataArray.write",
2924            &[
2925                arr.clone(),
2926                Value::Tensor(
2927                    Tensor::new((1..=16).map(|v| v as f64).collect(), vec![4, 4])
2928                        .expect("full tensor"),
2929                ),
2930            ],
2931        )
2932        .expect("initial write");
2933
2934        {
2935            let mut keys = uploads.lock().expect("uploads lock");
2936            keys.clear();
2937        }
2938
2939        let slice = Value::Cell(
2940            CellArray::new(
2941                vec![
2942                    Value::Tensor(Tensor::new(vec![2.0, 3.0], vec![1, 2]).expect("range")),
2943                    Value::Tensor(Tensor::new(vec![2.0, 3.0], vec![1, 2]).expect("range")),
2944                ],
2945                1,
2946                2,
2947            )
2948            .expect("slice cell"),
2949        );
2950        let rhs =
2951            Value::Tensor(Tensor::new(vec![19.0, 18.0, 17.0, 16.0], vec![2, 2]).expect("rhs"));
2952        call_builtin("DataArray.write", &[arr, slice, rhs]).expect("slice write");
2953
2954        let mut keys = uploads.lock().expect("uploads lock").clone();
2955        keys.sort();
2956        keys.dedup();
2957        assert_eq!(
2958            keys.as_slice(),
2959            [
2960                "0.0".to_string(),
2961                "0.1".to_string(),
2962                "1.0".to_string(),
2963                "1.1".to_string(),
2964            ]
2965            .as_slice()
2966        );
2967
2968        let _ = shutdown_tx.send(());
2969        drop(runtime);
2970    }
2971
2972    #[test]
2973    fn tx_create_resize_fill_and_delete_array() {
2974        let _serial = serial_test_guard();
2975        let dir = tempfile::tempdir().expect("tempdir");
2976        let path = dir.path().join("tx-ops.data").to_string_lossy().to_string();
2977
2978        let mut arrays = StructValue::new();
2979        let mut array_meta = StructValue::new();
2980        array_meta
2981            .fields
2982            .insert("dtype".to_string(), Value::String("f64".to_string()));
2983        array_meta.fields.insert(
2984            "shape".to_string(),
2985            Value::Tensor(Tensor::new(vec![1.0, 1.0], vec![1, 2]).expect("shape tensor")),
2986        );
2987        arrays
2988            .fields
2989            .insert("base".to_string(), Value::Struct(array_meta));
2990        let mut schema = StructValue::new();
2991        schema
2992            .fields
2993            .insert("arrays".to_string(), Value::Struct(arrays));
2994
2995        let ds = call_builtin(
2996            "data.create",
2997            &[
2998                Value::String(path.clone()),
2999                Value::Struct(schema),
3000                Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
3001            ],
3002        )
3003        .expect("create dataset");
3004
3005        let tx = call_builtin("Dataset.begin", &[ds]).expect("begin tx");
3006        let mut new_meta = StructValue::new();
3007        new_meta
3008            .fields
3009            .insert("dtype".to_string(), Value::String("f64".to_string()));
3010        new_meta.fields.insert(
3011            "shape".to_string(),
3012            Value::Tensor(Tensor::new(vec![2.0, 2.0], vec![1, 2]).expect("shape tensor")),
3013        );
3014        call_builtin(
3015            "DataTransaction.create_array",
3016            &[
3017                tx.clone(),
3018                Value::String("new_array".to_string()),
3019                Value::Struct(new_meta),
3020            ],
3021        )
3022        .expect("create array in tx");
3023        call_builtin(
3024            "DataTransaction.resize",
3025            &[
3026                tx.clone(),
3027                Value::String("new_array".to_string()),
3028                Value::Tensor(Tensor::new(vec![3.0, 1.0], vec![1, 2]).expect("shape tensor")),
3029            ],
3030        )
3031        .expect("resize array in tx");
3032        call_builtin(
3033            "DataTransaction.fill",
3034            &[
3035                tx.clone(),
3036                Value::String("new_array".to_string()),
3037                Value::Num(7.0),
3038            ],
3039        )
3040        .expect("fill array in tx");
3041        call_builtin(
3042            "DataTransaction.delete_array",
3043            &[tx.clone(), Value::String("base".to_string())],
3044        )
3045        .expect("delete array in tx");
3046        call_builtin("DataTransaction.commit", &[tx]).expect("commit tx");
3047
3048        let ds = call_builtin(
3049            "data.open",
3050            &[
3051                Value::String(path),
3052                Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
3053            ],
3054        )
3055        .expect("open dataset");
3056        let has_base = call_builtin(
3057            "Dataset.has_array",
3058            &[ds.clone(), Value::String("base".to_string())],
3059        )
3060        .expect("has base");
3061        assert_eq!(has_base, Value::Bool(false));
3062        let arr = call_builtin(
3063            "Dataset.array",
3064            &[ds, Value::String("new_array".to_string())],
3065        )
3066        .expect("new array");
3067        let read_back = call_builtin("DataArray.read", &[arr]).expect("read array");
3068        let Value::Tensor(t) = read_back else {
3069            panic!("expected tensor");
3070        };
3071        assert_eq!(t.shape, vec![3, 1]);
3072        assert_eq!(t.data, vec![7.0, 7.0, 7.0]);
3073    }
3074}