Skip to main content

runmat_runtime/builtins/io/data/
mod.rs

1//! Cloud-ready dataset persistence builtins (`data.*`).
2
3use std::collections::BTreeMap;
4use std::collections::HashMap;
5use std::path::PathBuf;
6
7use runmat_builtins::{
8    BuiltinCompletionPolicy, BuiltinDescriptor, BuiltinErrorDescriptor, BuiltinOutputMode,
9    BuiltinParamArity, BuiltinParamDescriptor, BuiltinParamType, BuiltinSignatureDescriptor,
10    ObjectInstance, StructValue, Tensor, Value,
11};
12use runmat_filesystem::data_contract::{DataChunkDescriptor, DataChunkUploadRequest};
13use runmat_macros::runtime_builtin;
14
15use crate::builtins::common::spec::{
16    BroadcastSemantics, BuiltinFusionSpec, BuiltinGpuSpec, ConstantStrategy, GpuOpKind,
17    ReductionNaN, ResidencyPolicy, ShapeRequirements,
18};
19use crate::data::{
20    array_object, data_error, dataset_object, dataset_root, ensure_manifest_sequence,
21    get_object_prop, manifest_path, manifest_version_token, now_rfc3339, parse_schema,
22    parse_string, read_array_payload_async, read_manifest_async, remove_tx, sha256_hex, start_tx,
23    transaction_object, with_tx, with_tx_mut, write_array_payload_async, write_manifest_async,
24    DataArrayMeta, DataArrayPayload, DataChunkIndex, DataChunkIndexEntry, DataManifest,
25    PendingCreateArray, PendingFill, PendingResize, PendingWrite, TxnStatus,
26};
27use crate::{make_cell, BuiltinResult};
28
29#[runmat_macros::register_gpu_spec(builtin_path = "crate::builtins::io::data")]
30pub const GPU_SPEC: BuiltinGpuSpec = BuiltinGpuSpec {
31    name: "data.*",
32    op_kind: GpuOpKind::Custom("io-data"),
33    supported_precisions: &[],
34    broadcast: BroadcastSemantics::None,
35    provider_hooks: &[],
36    constant_strategy: ConstantStrategy::InlineLiteral,
37    residency: ResidencyPolicy::GatherImmediately,
38    nan_mode: ReductionNaN::Include,
39    two_pass_threshold: None,
40    workgroup_size: None,
41    accepts_nan_mode: false,
42    notes: "Dataset operations are host I/O and metadata orchestration.",
43};
44
45#[runmat_macros::register_fusion_spec(builtin_path = "crate::builtins::io::data")]
46pub const FUSION_SPEC: BuiltinFusionSpec = BuiltinFusionSpec {
47    name: "data.*",
48    shape: ShapeRequirements::Any,
49    constant_strategy: ConstantStrategy::InlineLiteral,
50    elementwise: None,
51    reduction: None,
52    emits_nan: false,
53    notes: "Data builtins are side-effecting and not fusible.",
54};
55
56const DATA_ERROR_INVALID_ARGUMENT: BuiltinErrorDescriptor = BuiltinErrorDescriptor {
57    code: "RM.DATA.INVALID_ARGUMENT",
58    identifier: Some("RunMat:data:InvalidArgument"),
59    when: "Arguments, receiver object, or option grammar are invalid for the requested data API.",
60    message: "data: invalid argument",
61};
62
63const DATA_ERROR_NOT_FOUND: BuiltinErrorDescriptor = BuiltinErrorDescriptor {
64    code: "RM.DATA.NOT_FOUND",
65    identifier: Some("RunMat:data:NotFound"),
66    when: "Referenced dataset/array/transaction object is missing or cannot be resolved.",
67    message: "data: requested object not found",
68};
69
70const DATA_ERROR_INTERNAL: BuiltinErrorDescriptor = BuiltinErrorDescriptor {
71    code: "RM.DATA.INTERNAL",
72    identifier: Some("RunMat:data:Internal"),
73    when: "Filesystem/manifest/chunk processing fails unexpectedly during data API execution.",
74    message: "data: internal operation failed",
75};
76
77const DATA_DESCRIPTOR_ERRORS: [BuiltinErrorDescriptor; 3] = [
78    DATA_ERROR_INVALID_ARGUMENT,
79    DATA_ERROR_NOT_FOUND,
80    DATA_ERROR_INTERNAL,
81];
82
83const OUT_DATASET: [BuiltinParamDescriptor; 1] = [BuiltinParamDescriptor {
84    name: "ds",
85    ty: BuiltinParamType::Any,
86    arity: BuiltinParamArity::Required,
87    default: None,
88    description: "Dataset handle object.",
89}];
90
91const OUT_ARRAY: [BuiltinParamDescriptor; 1] = [BuiltinParamDescriptor {
92    name: "arr",
93    ty: BuiltinParamType::Any,
94    arity: BuiltinParamArity::Required,
95    default: None,
96    description: "DataArray handle object.",
97}];
98
99const OUT_TX: [BuiltinParamDescriptor; 1] = [BuiltinParamDescriptor {
100    name: "tx",
101    ty: BuiltinParamType::Any,
102    arity: BuiltinParamArity::Required,
103    default: None,
104    description: "DataTransaction handle object.",
105}];
106
107const OUT_BOOL: [BuiltinParamDescriptor; 1] = [BuiltinParamDescriptor {
108    name: "ok",
109    ty: BuiltinParamType::LogicalArray,
110    arity: BuiltinParamArity::Required,
111    default: None,
112    description: "Logical success/result flag.",
113}];
114
115const OUT_STRING: [BuiltinParamDescriptor; 1] = [BuiltinParamDescriptor {
116    name: "s",
117    ty: BuiltinParamType::StringScalar,
118    arity: BuiltinParamArity::Required,
119    default: None,
120    description: "String scalar result.",
121}];
122
123const OUT_STRUCT: [BuiltinParamDescriptor; 1] = [BuiltinParamDescriptor {
124    name: "S",
125    ty: BuiltinParamType::Any,
126    arity: BuiltinParamArity::Required,
127    default: None,
128    description: "Struct result.",
129}];
130
131const OUT_CELL: [BuiltinParamDescriptor; 1] = [BuiltinParamDescriptor {
132    name: "C",
133    ty: BuiltinParamType::Any,
134    arity: BuiltinParamArity::Required,
135    default: None,
136    description: "Cell-array result.",
137}];
138
139const OUT_VALUE: [BuiltinParamDescriptor; 1] = [BuiltinParamDescriptor {
140    name: "value",
141    ty: BuiltinParamType::Any,
142    arity: BuiltinParamArity::Required,
143    default: None,
144    description: "Value result.",
145}];
146
147const OUT_TENSOR: [BuiltinParamDescriptor; 1] = [BuiltinParamDescriptor {
148    name: "X",
149    ty: BuiltinParamType::NumericArray,
150    arity: BuiltinParamArity::Required,
151    default: None,
152    description: "Numeric tensor result.",
153}];
154
155const IN_PATH_SCHEMA_REST: [BuiltinParamDescriptor; 3] = [
156    BuiltinParamDescriptor {
157        name: "path",
158        ty: BuiltinParamType::StringScalar,
159        arity: BuiltinParamArity::Required,
160        default: None,
161        description: "Dataset path (.data).",
162    },
163    BuiltinParamDescriptor {
164        name: "schema",
165        ty: BuiltinParamType::Any,
166        arity: BuiltinParamArity::Required,
167        default: None,
168        description: "Dataset schema struct.",
169    },
170    BuiltinParamDescriptor {
171        name: "options",
172        ty: BuiltinParamType::Any,
173        arity: BuiltinParamArity::Variadic,
174        default: None,
175        description: "Reserved name/value options.",
176    },
177];
178
179const IN_PATH_REST: [BuiltinParamDescriptor; 2] = [
180    BuiltinParamDescriptor {
181        name: "path",
182        ty: BuiltinParamType::StringScalar,
183        arity: BuiltinParamArity::Required,
184        default: None,
185        description: "Dataset path (.data).",
186    },
187    BuiltinParamDescriptor {
188        name: "options",
189        ty: BuiltinParamType::Any,
190        arity: BuiltinParamArity::Variadic,
191        default: None,
192        description: "Reserved name/value options.",
193    },
194];
195
196const IN_FROM_TO_REST: [BuiltinParamDescriptor; 3] = [
197    BuiltinParamDescriptor {
198        name: "fromPath",
199        ty: BuiltinParamType::StringScalar,
200        arity: BuiltinParamArity::Required,
201        default: None,
202        description: "Source dataset path.",
203    },
204    BuiltinParamDescriptor {
205        name: "toPath",
206        ty: BuiltinParamType::StringScalar,
207        arity: BuiltinParamArity::Required,
208        default: None,
209        description: "Destination dataset path.",
210    },
211    BuiltinParamDescriptor {
212        name: "options",
213        ty: BuiltinParamType::Any,
214        arity: BuiltinParamArity::Variadic,
215        default: None,
216        description: "Reserved name/value options.",
217    },
218];
219
220const IN_PATH_FORMAT_TARGET_REST: [BuiltinParamDescriptor; 4] = [
221    BuiltinParamDescriptor {
222        name: "path",
223        ty: BuiltinParamType::StringScalar,
224        arity: BuiltinParamArity::Required,
225        default: None,
226        description: "Dataset path (.data).",
227    },
228    BuiltinParamDescriptor {
229        name: "format",
230        ty: BuiltinParamType::StringScalar,
231        arity: BuiltinParamArity::Required,
232        default: None,
233        description: "Format token (currently 'data').",
234    },
235    BuiltinParamDescriptor {
236        name: "targetPath",
237        ty: BuiltinParamType::StringScalar,
238        arity: BuiltinParamArity::Required,
239        default: None,
240        description: "Target dataset path.",
241    },
242    BuiltinParamDescriptor {
243        name: "options",
244        ty: BuiltinParamType::Any,
245        arity: BuiltinParamArity::Variadic,
246        default: None,
247        description: "Reserved name/value options.",
248    },
249];
250
251const IN_PREFIX_REST: [BuiltinParamDescriptor; 2] = [
252    BuiltinParamDescriptor {
253        name: "prefix",
254        ty: BuiltinParamType::StringScalar,
255        arity: BuiltinParamArity::Required,
256        default: None,
257        description: "Filesystem prefix to scan for datasets.",
258    },
259    BuiltinParamDescriptor {
260        name: "options",
261        ty: BuiltinParamType::Any,
262        arity: BuiltinParamArity::Variadic,
263        default: None,
264        description: "Reserved name/value options.",
265    },
266];
267
268const IN_BASE: [BuiltinParamDescriptor; 1] = [BuiltinParamDescriptor {
269    name: "obj",
270    ty: BuiltinParamType::Any,
271    arity: BuiltinParamArity::Required,
272    default: None,
273    description: "Receiver object.",
274}];
275
276const IN_BASE_NAME: [BuiltinParamDescriptor; 2] = [
277    BuiltinParamDescriptor {
278        name: "obj",
279        ty: BuiltinParamType::Any,
280        arity: BuiltinParamArity::Required,
281        default: None,
282        description: "Receiver object.",
283    },
284    BuiltinParamDescriptor {
285        name: "name",
286        ty: BuiltinParamType::StringScalar,
287        arity: BuiltinParamArity::Required,
288        default: None,
289        description: "Name key/array identifier.",
290    },
291];
292
293const IN_BASE_KEY_DEFAULT: [BuiltinParamDescriptor; 3] = [
294    BuiltinParamDescriptor {
295        name: "obj",
296        ty: BuiltinParamType::Any,
297        arity: BuiltinParamArity::Required,
298        default: None,
299        description: "Receiver object.",
300    },
301    BuiltinParamDescriptor {
302        name: "key",
303        ty: BuiltinParamType::StringScalar,
304        arity: BuiltinParamArity::Required,
305        default: None,
306        description: "Attribute key.",
307    },
308    BuiltinParamDescriptor {
309        name: "defaultValue",
310        ty: BuiltinParamType::Any,
311        arity: BuiltinParamArity::Optional,
312        default: Some("0"),
313        description: "Fallback value when key is absent.",
314    },
315];
316
317const IN_BASE_KEY_VALUE: [BuiltinParamDescriptor; 3] = [
318    BuiltinParamDescriptor {
319        name: "obj",
320        ty: BuiltinParamType::Any,
321        arity: BuiltinParamArity::Required,
322        default: None,
323        description: "Receiver object.",
324    },
325    BuiltinParamDescriptor {
326        name: "key",
327        ty: BuiltinParamType::StringScalar,
328        arity: BuiltinParamArity::Required,
329        default: None,
330        description: "Attribute key.",
331    },
332    BuiltinParamDescriptor {
333        name: "value",
334        ty: BuiltinParamType::Any,
335        arity: BuiltinParamArity::Required,
336        default: None,
337        description: "Attribute value.",
338    },
339];
340
341const IN_BASE_ATTRS: [BuiltinParamDescriptor; 2] = [
342    BuiltinParamDescriptor {
343        name: "obj",
344        ty: BuiltinParamType::Any,
345        arity: BuiltinParamArity::Required,
346        default: None,
347        description: "Receiver object.",
348    },
349    BuiltinParamDescriptor {
350        name: "attrs",
351        ty: BuiltinParamType::Any,
352        arity: BuiltinParamArity::Required,
353        default: None,
354        description: "Struct of attribute updates.",
355    },
356];
357
358const IN_BASE_LABEL_REST: [BuiltinParamDescriptor; 3] = [
359    BuiltinParamDescriptor {
360        name: "obj",
361        ty: BuiltinParamType::Any,
362        arity: BuiltinParamArity::Required,
363        default: None,
364        description: "Receiver object.",
365    },
366    BuiltinParamDescriptor {
367        name: "label",
368        ty: BuiltinParamType::StringScalar,
369        arity: BuiltinParamArity::Required,
370        default: None,
371        description: "Snapshot label.",
372    },
373    BuiltinParamDescriptor {
374        name: "options",
375        ty: BuiltinParamType::Any,
376        arity: BuiltinParamArity::Variadic,
377        default: None,
378        description: "Reserved name/value options.",
379    },
380];
381
382const IN_BASE_REST: [BuiltinParamDescriptor; 2] = [
383    BuiltinParamDescriptor {
384        name: "obj",
385        ty: BuiltinParamType::Any,
386        arity: BuiltinParamArity::Required,
387        default: None,
388        description: "Receiver object.",
389    },
390    BuiltinParamDescriptor {
391        name: "options",
392        ty: BuiltinParamType::Any,
393        arity: BuiltinParamArity::Variadic,
394        default: None,
395        description: "Reserved name/value options.",
396    },
397];
398
399const IN_BASE_SLICE_OPTIONAL: [BuiltinParamDescriptor; 2] = [
400    BuiltinParamDescriptor {
401        name: "obj",
402        ty: BuiltinParamType::Any,
403        arity: BuiltinParamArity::Required,
404        default: None,
405        description: "DataArray receiver object.",
406    },
407    BuiltinParamDescriptor {
408        name: "sliceSpec",
409        ty: BuiltinParamType::Any,
410        arity: BuiltinParamArity::Optional,
411        default: None,
412        description: "Optional slice specification.",
413    },
414];
415
416const IN_BASE_VALUES: [BuiltinParamDescriptor; 2] = [
417    BuiltinParamDescriptor {
418        name: "obj",
419        ty: BuiltinParamType::Any,
420        arity: BuiltinParamArity::Required,
421        default: None,
422        description: "DataArray receiver object.",
423    },
424    BuiltinParamDescriptor {
425        name: "values",
426        ty: BuiltinParamType::Any,
427        arity: BuiltinParamArity::Required,
428        default: None,
429        description: "Full-array values payload.",
430    },
431];
432
433const IN_BASE_SLICE_VALUES: [BuiltinParamDescriptor; 3] = [
434    BuiltinParamDescriptor {
435        name: "obj",
436        ty: BuiltinParamType::Any,
437        arity: BuiltinParamArity::Required,
438        default: None,
439        description: "DataArray receiver object.",
440    },
441    BuiltinParamDescriptor {
442        name: "sliceSpec",
443        ty: BuiltinParamType::Any,
444        arity: BuiltinParamArity::Required,
445        default: None,
446        description: "Slice specification.",
447    },
448    BuiltinParamDescriptor {
449        name: "values",
450        ty: BuiltinParamType::Any,
451        arity: BuiltinParamArity::Required,
452        default: None,
453        description: "Slice values payload.",
454    },
455];
456
457const IN_BASE_NEW_SHAPE_REST: [BuiltinParamDescriptor; 3] = [
458    BuiltinParamDescriptor {
459        name: "obj",
460        ty: BuiltinParamType::Any,
461        arity: BuiltinParamArity::Required,
462        default: None,
463        description: "DataArray receiver object.",
464    },
465    BuiltinParamDescriptor {
466        name: "newShape",
467        ty: BuiltinParamType::Any,
468        arity: BuiltinParamArity::Required,
469        default: None,
470        description: "New array shape.",
471    },
472    BuiltinParamDescriptor {
473        name: "options",
474        ty: BuiltinParamType::Any,
475        arity: BuiltinParamArity::Variadic,
476        default: None,
477        description: "Reserved name/value options.",
478    },
479];
480
481const IN_BASE_VALUE_REST: [BuiltinParamDescriptor; 3] = [
482    BuiltinParamDescriptor {
483        name: "obj",
484        ty: BuiltinParamType::Any,
485        arity: BuiltinParamArity::Required,
486        default: None,
487        description: "DataArray receiver object.",
488    },
489    BuiltinParamDescriptor {
490        name: "value",
491        ty: BuiltinParamType::Any,
492        arity: BuiltinParamArity::Required,
493        default: None,
494        description: "Fill value.",
495    },
496    BuiltinParamDescriptor {
497        name: "options",
498        ty: BuiltinParamType::Any,
499        arity: BuiltinParamArity::Variadic,
500        default: None,
501        description: "Reserved name/value options.",
502    },
503];
504
505const IN_TX_WRITE: [BuiltinParamDescriptor; 5] = [
506    BuiltinParamDescriptor {
507        name: "tx",
508        ty: BuiltinParamType::Any,
509        arity: BuiltinParamArity::Required,
510        default: None,
511        description: "DataTransaction receiver.",
512    },
513    BuiltinParamDescriptor {
514        name: "arrayName",
515        ty: BuiltinParamType::StringScalar,
516        arity: BuiltinParamArity::Required,
517        default: None,
518        description: "Target array name.",
519    },
520    BuiltinParamDescriptor {
521        name: "sliceSpec",
522        ty: BuiltinParamType::Any,
523        arity: BuiltinParamArity::Required,
524        default: None,
525        description: "Slice specification.",
526    },
527    BuiltinParamDescriptor {
528        name: "values",
529        ty: BuiltinParamType::Any,
530        arity: BuiltinParamArity::Required,
531        default: None,
532        description: "Values payload.",
533    },
534    BuiltinParamDescriptor {
535        name: "options",
536        ty: BuiltinParamType::Any,
537        arity: BuiltinParamArity::Variadic,
538        default: None,
539        description: "Reserved name/value options.",
540    },
541];
542
543const IN_TX_ARRAY_SHAPE_REST: [BuiltinParamDescriptor; 4] = [
544    BuiltinParamDescriptor {
545        name: "tx",
546        ty: BuiltinParamType::Any,
547        arity: BuiltinParamArity::Required,
548        default: None,
549        description: "DataTransaction receiver.",
550    },
551    BuiltinParamDescriptor {
552        name: "arrayName",
553        ty: BuiltinParamType::StringScalar,
554        arity: BuiltinParamArity::Required,
555        default: None,
556        description: "Target array name.",
557    },
558    BuiltinParamDescriptor {
559        name: "newShape",
560        ty: BuiltinParamType::Any,
561        arity: BuiltinParamArity::Required,
562        default: None,
563        description: "New shape vector.",
564    },
565    BuiltinParamDescriptor {
566        name: "options",
567        ty: BuiltinParamType::Any,
568        arity: BuiltinParamArity::Variadic,
569        default: None,
570        description: "Reserved name/value options.",
571    },
572];
573
574const IN_TX_ARRAY_VALUE_SLICE_OPT: [BuiltinParamDescriptor; 4] = [
575    BuiltinParamDescriptor {
576        name: "tx",
577        ty: BuiltinParamType::Any,
578        arity: BuiltinParamArity::Required,
579        default: None,
580        description: "DataTransaction receiver.",
581    },
582    BuiltinParamDescriptor {
583        name: "arrayName",
584        ty: BuiltinParamType::StringScalar,
585        arity: BuiltinParamArity::Required,
586        default: None,
587        description: "Target array name.",
588    },
589    BuiltinParamDescriptor {
590        name: "value",
591        ty: BuiltinParamType::Any,
592        arity: BuiltinParamArity::Required,
593        default: None,
594        description: "Fill value.",
595    },
596    BuiltinParamDescriptor {
597        name: "sliceSpec",
598        ty: BuiltinParamType::Any,
599        arity: BuiltinParamArity::Optional,
600        default: None,
601        description: "Optional slice specification.",
602    },
603];
604
605const IN_TX_ARRAY_NAME: [BuiltinParamDescriptor; 2] = [
606    BuiltinParamDescriptor {
607        name: "tx",
608        ty: BuiltinParamType::Any,
609        arity: BuiltinParamArity::Required,
610        default: None,
611        description: "DataTransaction receiver.",
612    },
613    BuiltinParamDescriptor {
614        name: "arrayName",
615        ty: BuiltinParamType::StringScalar,
616        arity: BuiltinParamArity::Required,
617        default: None,
618        description: "Target array name.",
619    },
620];
621
622const IN_TX_ARRAY_META: [BuiltinParamDescriptor; 3] = [
623    BuiltinParamDescriptor {
624        name: "tx",
625        ty: BuiltinParamType::Any,
626        arity: BuiltinParamArity::Required,
627        default: None,
628        description: "DataTransaction receiver.",
629    },
630    BuiltinParamDescriptor {
631        name: "arrayName",
632        ty: BuiltinParamType::StringScalar,
633        arity: BuiltinParamArity::Required,
634        default: None,
635        description: "New array name.",
636    },
637    BuiltinParamDescriptor {
638        name: "meta",
639        ty: BuiltinParamType::Any,
640        arity: BuiltinParamArity::Required,
641        default: None,
642        description: "Array metadata struct.",
643    },
644];
645
646const IN_TX_COMMIT_REST: [BuiltinParamDescriptor; 2] = [
647    BuiltinParamDescriptor {
648        name: "tx",
649        ty: BuiltinParamType::Any,
650        arity: BuiltinParamArity::Required,
651        default: None,
652        description: "DataTransaction receiver.",
653    },
654    BuiltinParamDescriptor {
655        name: "options",
656        ty: BuiltinParamType::Any,
657        arity: BuiltinParamArity::Variadic,
658        default: None,
659        description: "Commit options (e.g. if_manifest).",
660    },
661];
662
663macro_rules! one_sig_descriptor {
664    ($desc:ident, $sigs:ident, $label:expr, $inputs:expr, $outputs:expr) => {
665        const $sigs: [BuiltinSignatureDescriptor; 1] = [BuiltinSignatureDescriptor {
666            label: $label,
667            inputs: $inputs,
668            outputs: $outputs,
669        }];
670        pub const $desc: BuiltinDescriptor = BuiltinDescriptor {
671            signatures: &$sigs,
672            output_mode: BuiltinOutputMode::Fixed,
673            completion_policy: BuiltinCompletionPolicy::Public,
674            errors: &DATA_DESCRIPTOR_ERRORS,
675        };
676    };
677}
678
679one_sig_descriptor!(
680    DATA_CREATE_DESCRIPTOR,
681    DATA_CREATE_SIGS,
682    "ds = data.create(path, schema, Name, Value, ...)",
683    &IN_PATH_SCHEMA_REST,
684    &OUT_DATASET
685);
686one_sig_descriptor!(
687    DATA_OPEN_DESCRIPTOR,
688    DATA_OPEN_SIGS,
689    "ds = data.open(path, Name, Value, ...)",
690    &IN_PATH_REST,
691    &OUT_DATASET
692);
693one_sig_descriptor!(
694    DATA_EXISTS_DESCRIPTOR,
695    DATA_EXISTS_SIGS,
696    "tf = data.exists(path)",
697    &IN_PATH_REST,
698    &OUT_BOOL
699);
700one_sig_descriptor!(
701    DATA_DELETE_DESCRIPTOR,
702    DATA_DELETE_SIGS,
703    "tf = data.delete(path, Name, Value, ...)",
704    &IN_PATH_REST,
705    &OUT_BOOL
706);
707one_sig_descriptor!(
708    DATA_COPY_DESCRIPTOR,
709    DATA_COPY_SIGS,
710    "tf = data.copy(fromPath, toPath, Name, Value, ...)",
711    &IN_FROM_TO_REST,
712    &OUT_BOOL
713);
714one_sig_descriptor!(
715    DATA_MOVE_DESCRIPTOR,
716    DATA_MOVE_SIGS,
717    "tf = data.move(fromPath, toPath, Name, Value, ...)",
718    &IN_FROM_TO_REST,
719    &OUT_BOOL
720);
721one_sig_descriptor!(
722    DATA_IMPORT_DESCRIPTOR,
723    DATA_IMPORT_SIGS,
724    "ds = data.import(path, format, sourcePath, Name, Value, ...)",
725    &IN_PATH_FORMAT_TARGET_REST,
726    &OUT_DATASET
727);
728one_sig_descriptor!(
729    DATA_EXPORT_DESCRIPTOR,
730    DATA_EXPORT_SIGS,
731    "tf = data.export(path, format, targetPath, Name, Value, ...)",
732    &IN_PATH_FORMAT_TARGET_REST,
733    &OUT_BOOL
734);
735one_sig_descriptor!(
736    DATA_LIST_DESCRIPTOR,
737    DATA_LIST_SIGS,
738    "C = data.list(prefix, Name, Value, ...)",
739    &IN_PREFIX_REST,
740    &OUT_CELL
741);
742one_sig_descriptor!(
743    DATA_INSPECT_DESCRIPTOR,
744    DATA_INSPECT_SIGS,
745    "S = data.inspect(path)",
746    &IN_PATH_REST,
747    &OUT_STRUCT
748);
749
750one_sig_descriptor!(
751    DATASET_PATH_DESCRIPTOR,
752    DATASET_PATH_SIGS,
753    "path = Dataset.path(ds)",
754    &IN_BASE,
755    &OUT_STRING
756);
757one_sig_descriptor!(
758    DATASET_ID_DESCRIPTOR,
759    DATASET_ID_SIGS,
760    "id = Dataset.id(ds)",
761    &IN_BASE,
762    &OUT_STRING
763);
764one_sig_descriptor!(
765    DATASET_VERSION_DESCRIPTOR,
766    DATASET_VERSION_SIGS,
767    "version = Dataset.version(ds)",
768    &IN_BASE,
769    &OUT_STRING
770);
771one_sig_descriptor!(
772    DATASET_ARRAYS_DESCRIPTOR,
773    DATASET_ARRAYS_SIGS,
774    "C = Dataset.arrays(ds)",
775    &IN_BASE,
776    &OUT_CELL
777);
778one_sig_descriptor!(
779    DATASET_HAS_ARRAY_DESCRIPTOR,
780    DATASET_HAS_ARRAY_SIGS,
781    "tf = Dataset.has_array(ds, name)",
782    &IN_BASE_NAME,
783    &OUT_BOOL
784);
785one_sig_descriptor!(
786    DATASET_ARRAY_DESCRIPTOR,
787    DATASET_ARRAY_SIGS,
788    "arr = Dataset.array(ds, name)",
789    &IN_BASE_NAME,
790    &OUT_ARRAY
791);
792one_sig_descriptor!(
793    DATASET_ATTRS_DESCRIPTOR,
794    DATASET_ATTRS_SIGS,
795    "S = Dataset.attrs(ds)",
796    &IN_BASE,
797    &OUT_STRUCT
798);
799one_sig_descriptor!(
800    DATASET_GET_ATTR_DESCRIPTOR,
801    DATASET_GET_ATTR_SIGS,
802    "value = Dataset.get_attr(ds, key, defaultValue)",
803    &IN_BASE_KEY_DEFAULT,
804    &OUT_VALUE
805);
806one_sig_descriptor!(
807    DATASET_SET_ATTR_DESCRIPTOR,
808    DATASET_SET_ATTR_SIGS,
809    "tf = Dataset.set_attr(ds, key, value)",
810    &IN_BASE_KEY_VALUE,
811    &OUT_BOOL
812);
813one_sig_descriptor!(
814    DATASET_SET_ATTRS_DESCRIPTOR,
815    DATASET_SET_ATTRS_SIGS,
816    "tf = Dataset.set_attrs(ds, attrs)",
817    &IN_BASE_ATTRS,
818    &OUT_BOOL
819);
820one_sig_descriptor!(
821    DATASET_BEGIN_DESCRIPTOR,
822    DATASET_BEGIN_SIGS,
823    "tx = Dataset.begin(ds, Name, Value, ...)",
824    &IN_BASE_REST,
825    &OUT_TX
826);
827one_sig_descriptor!(
828    DATASET_SNAPSHOT_DESCRIPTOR,
829    DATASET_SNAPSHOT_SIGS,
830    "snapshotPath = Dataset.snapshot(ds, label, Name, Value, ...)",
831    &IN_BASE_LABEL_REST,
832    &OUT_STRING
833);
834one_sig_descriptor!(
835    DATASET_REFRESH_DESCRIPTOR,
836    DATASET_REFRESH_SIGS,
837    "ds = Dataset.refresh(ds)",
838    &IN_BASE,
839    &OUT_DATASET
840);
841
842one_sig_descriptor!(
843    DATAARRAY_NAME_DESCRIPTOR,
844    DATAARRAY_NAME_SIGS,
845    "name = DataArray.name(arr)",
846    &IN_BASE,
847    &OUT_STRING
848);
849one_sig_descriptor!(
850    DATAARRAY_DTYPE_DESCRIPTOR,
851    DATAARRAY_DTYPE_SIGS,
852    "dtype = DataArray.dtype(arr)",
853    &IN_BASE,
854    &OUT_STRING
855);
856one_sig_descriptor!(
857    DATAARRAY_SHAPE_DESCRIPTOR,
858    DATAARRAY_SHAPE_SIGS,
859    "shape = DataArray.shape(arr)",
860    &IN_BASE,
861    &OUT_TENSOR
862);
863one_sig_descriptor!(
864    DATAARRAY_RANK_DESCRIPTOR,
865    DATAARRAY_RANK_SIGS,
866    "rank = DataArray.rank(arr)",
867    &IN_BASE,
868    &OUT_VALUE
869);
870one_sig_descriptor!(
871    DATAARRAY_CHUNK_SHAPE_DESCRIPTOR,
872    DATAARRAY_CHUNK_SHAPE_SIGS,
873    "chunkShape = DataArray.chunk_shape(arr)",
874    &IN_BASE,
875    &OUT_TENSOR
876);
877one_sig_descriptor!(
878    DATAARRAY_CODEC_DESCRIPTOR,
879    DATAARRAY_CODEC_SIGS,
880    "codec = DataArray.codec(arr)",
881    &IN_BASE,
882    &OUT_STRING
883);
884one_sig_descriptor!(
885    DATAARRAY_READ_DESCRIPTOR,
886    DATAARRAY_READ_SIGS,
887    "X = DataArray.read(arr, sliceSpec)",
888    &IN_BASE_SLICE_OPTIONAL,
889    &OUT_TENSOR
890);
891const DATAARRAY_WRITE_SIGS: [BuiltinSignatureDescriptor; 2] = [
892    BuiltinSignatureDescriptor {
893        label: "tf = DataArray.write(arr, values)",
894        inputs: &IN_BASE_VALUES,
895        outputs: &OUT_BOOL,
896    },
897    BuiltinSignatureDescriptor {
898        label: "tf = DataArray.write(arr, sliceSpec, values)",
899        inputs: &IN_BASE_SLICE_VALUES,
900        outputs: &OUT_BOOL,
901    },
902];
903pub const DATAARRAY_WRITE_DESCRIPTOR: BuiltinDescriptor = BuiltinDescriptor {
904    signatures: &DATAARRAY_WRITE_SIGS,
905    output_mode: BuiltinOutputMode::Fixed,
906    completion_policy: BuiltinCompletionPolicy::Public,
907    errors: &DATA_DESCRIPTOR_ERRORS,
908};
909one_sig_descriptor!(
910    DATAARRAY_RESIZE_DESCRIPTOR,
911    DATAARRAY_RESIZE_SIGS,
912    "tf = DataArray.resize(arr, newShape, Name, Value, ...)",
913    &IN_BASE_NEW_SHAPE_REST,
914    &OUT_BOOL
915);
916one_sig_descriptor!(
917    DATAARRAY_FILL_DESCRIPTOR,
918    DATAARRAY_FILL_SIGS,
919    "tf = DataArray.fill(arr, value, Name, Value, ...)",
920    &IN_BASE_VALUE_REST,
921    &OUT_BOOL
922);
923
924one_sig_descriptor!(
925    DATATX_ID_DESCRIPTOR,
926    DATATX_ID_SIGS,
927    "id = DataTransaction.id(tx)",
928    &IN_BASE,
929    &OUT_STRING
930);
931one_sig_descriptor!(
932    DATATX_WRITE_DESCRIPTOR,
933    DATATX_WRITE_SIGS_1,
934    "tf = DataTransaction.write(tx, arrayName, sliceSpec, values, Name, Value, ...)",
935    &IN_TX_WRITE,
936    &OUT_BOOL
937);
938one_sig_descriptor!(
939    DATATX_SET_ATTR_DESCRIPTOR,
940    DATATX_SET_ATTR_SIGS,
941    "tf = DataTransaction.set_attr(tx, key, value)",
942    &IN_BASE_KEY_VALUE,
943    &OUT_BOOL
944);
945one_sig_descriptor!(
946    DATATX_SET_ATTRS_DESCRIPTOR,
947    DATATX_SET_ATTRS_SIGS,
948    "tf = DataTransaction.set_attrs(tx, attrs)",
949    &IN_BASE_ATTRS,
950    &OUT_BOOL
951);
952one_sig_descriptor!(
953    DATATX_RESIZE_DESCRIPTOR,
954    DATATX_RESIZE_SIGS,
955    "tf = DataTransaction.resize(tx, arrayName, newShape, Name, Value, ...)",
956    &IN_TX_ARRAY_SHAPE_REST,
957    &OUT_BOOL
958);
959one_sig_descriptor!(
960    DATATX_FILL_DESCRIPTOR,
961    DATATX_FILL_SIGS,
962    "tf = DataTransaction.fill(tx, arrayName, value, sliceSpec)",
963    &IN_TX_ARRAY_VALUE_SLICE_OPT,
964    &OUT_BOOL
965);
966one_sig_descriptor!(
967    DATATX_DELETE_ARRAY_DESCRIPTOR,
968    DATATX_DELETE_ARRAY_SIGS,
969    "tf = DataTransaction.delete_array(tx, arrayName)",
970    &IN_TX_ARRAY_NAME,
971    &OUT_BOOL
972);
973one_sig_descriptor!(
974    DATATX_CREATE_ARRAY_DESCRIPTOR,
975    DATATX_CREATE_ARRAY_SIGS,
976    "tf = DataTransaction.create_array(tx, arrayName, meta)",
977    &IN_TX_ARRAY_META,
978    &OUT_BOOL
979);
980one_sig_descriptor!(
981    DATATX_COMMIT_DESCRIPTOR,
982    DATATX_COMMIT_SIGS,
983    "tf = DataTransaction.commit(tx, Name, Value, ...)",
984    &IN_TX_COMMIT_REST,
985    &OUT_BOOL
986);
987one_sig_descriptor!(
988    COMMIT_ALIAS_DESCRIPTOR,
989    COMMIT_ALIAS_SIGS,
990    "tf = commit(tx, Name, Value, ...)",
991    &IN_TX_COMMIT_REST,
992    &OUT_BOOL
993);
994one_sig_descriptor!(
995    DATATX_ABORT_DESCRIPTOR,
996    DATATX_ABORT_SIGS,
997    "tf = DataTransaction.abort(tx)",
998    &IN_BASE,
999    &OUT_BOOL
1000);
1001one_sig_descriptor!(
1002    DATATX_STATUS_DESCRIPTOR,
1003    DATATX_STATUS_SIGS,
1004    "status = DataTransaction.status(tx)",
1005    &IN_BASE,
1006    &OUT_STRING
1007);
1008
1009#[runtime_builtin(
1010    name = "data.create",
1011    category = "io/data",
1012    summary = "Create a typed dataset at a .data path.",
1013    keywords = "data,dataset,create,persistence",
1014    sink = true,
1015    type_resolver(crate::builtins::io::type_resolvers::data_dataset_type),
1016    descriptor(crate::builtins::io::data::DATA_CREATE_DESCRIPTOR),
1017    builtin_path = "crate::builtins::io::data"
1018)]
1019async fn data_create_builtin(
1020    path: Value,
1021    schema: Value,
1022    _rest: Vec<Value>,
1023) -> BuiltinResult<Value> {
1024    let path = parse_string(&path, "data.create path")?;
1025    let root = dataset_root(&path);
1026    let schema = parse_schema(&schema)?;
1027    let now = now_rfc3339();
1028    let mut arrays = BTreeMap::new();
1029    for (name, mut meta) in schema.arrays {
1030        let total = meta.shape.iter().copied().product::<usize>();
1031        let payload = DataArrayPayload {
1032            dtype: meta.dtype.clone(),
1033            shape: meta.shape.clone(),
1034            values: vec![0.0; total],
1035        };
1036        let (payload_path, chunk_index_path) =
1037            write_array_payload_async(&root, &name, &payload, &meta.chunk_shape).await?;
1038        meta.data_path = make_rel_data_path(&root, &payload_path)?;
1039        meta.chunk_index_path = Some(make_rel_data_path(&root, &chunk_index_path)?);
1040        arrays.insert(name, meta);
1041    }
1042
1043    let manifest = DataManifest {
1044        schema_version: 1,
1045        format: "runmat-data".to_string(),
1046        dataset_id: crate::data::new_dataset_id(),
1047        name: root.file_name().map(|v| v.to_string_lossy().to_string()),
1048        created_at: now.clone(),
1049        updated_at: now,
1050        arrays,
1051        attrs: BTreeMap::new(),
1052        txn_sequence: 0,
1053    };
1054    write_manifest_async(&root, &manifest).await?;
1055    Ok(dataset_object(&path, &manifest))
1056}
1057
1058#[runtime_builtin(
1059    name = "data.open",
1060    category = "io/data",
1061    summary = "Open a dataset handle from a .data path.",
1062    keywords = "data,dataset,open,persistence",
1063    type_resolver(crate::builtins::io::type_resolvers::data_dataset_type),
1064    descriptor(crate::builtins::io::data::DATA_OPEN_DESCRIPTOR),
1065    builtin_path = "crate::builtins::io::data"
1066)]
1067async fn data_open_builtin(path: Value, _rest: Vec<Value>) -> BuiltinResult<Value> {
1068    let path = parse_string(&path, "data.open path")?;
1069    let root = dataset_root(&path);
1070    let manifest = read_manifest_async(&root).await?;
1071    let mut ds = dataset_object(&path, &manifest);
1072    hydrate_dataset_descriptor_async(&path, &mut ds).await;
1073    Ok(ds)
1074}
1075
1076#[runtime_builtin(
1077    name = "data.exists",
1078    category = "io/data",
1079    summary = "Check if dataset exists.",
1080    keywords = "data,dataset,exists",
1081    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1082    descriptor(crate::builtins::io::data::DATA_EXISTS_DESCRIPTOR),
1083    builtin_path = "crate::builtins::io::data"
1084)]
1085async fn data_exists_builtin(path: Value) -> BuiltinResult<Value> {
1086    let path = parse_string(&path, "data.exists path")?;
1087    let root = dataset_root(&path);
1088    let exists = runmat_filesystem::metadata_async(manifest_path(&root))
1089        .await
1090        .is_ok();
1091    Ok(Value::Bool(exists))
1092}
1093
1094#[runtime_builtin(
1095    name = "data.delete",
1096    category = "io/data",
1097    summary = "Delete a dataset path.",
1098    keywords = "data,dataset,delete",
1099    sink = true,
1100    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1101    descriptor(crate::builtins::io::data::DATA_DELETE_DESCRIPTOR),
1102    builtin_path = "crate::builtins::io::data"
1103)]
1104async fn data_delete_builtin(path: Value, _rest: Vec<Value>) -> BuiltinResult<Value> {
1105    let path = parse_string(&path, "data.delete path")?;
1106    let root = dataset_root(&path);
1107    runmat_filesystem::remove_dir_all_async(&root)
1108        .await
1109        .map_err(|err| {
1110            data_error(format!(
1111                "data.delete: failed to remove '{}': {err}",
1112                root.display()
1113            ))
1114        })?;
1115    Ok(Value::Bool(true))
1116}
1117
1118#[runtime_builtin(
1119    name = "data.copy",
1120    category = "io/data",
1121    summary = "Copy dataset to new path.",
1122    keywords = "data,dataset,copy",
1123    sink = true,
1124    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1125    descriptor(crate::builtins::io::data::DATA_COPY_DESCRIPTOR),
1126    builtin_path = "crate::builtins::io::data"
1127)]
1128async fn data_copy_builtin(
1129    from_path: Value,
1130    to_path: Value,
1131    _rest: Vec<Value>,
1132) -> BuiltinResult<Value> {
1133    let from = parse_string(&from_path, "data.copy fromPath")?;
1134    let to = parse_string(&to_path, "data.copy toPath")?;
1135    copy_dir_recursive(&dataset_root(&from), &dataset_root(&to)).await?;
1136    Ok(Value::Bool(true))
1137}
1138
1139#[runtime_builtin(
1140    name = "data.move",
1141    category = "io/data",
1142    summary = "Move dataset to new path.",
1143    keywords = "data,dataset,move",
1144    sink = true,
1145    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1146    descriptor(crate::builtins::io::data::DATA_MOVE_DESCRIPTOR),
1147    builtin_path = "crate::builtins::io::data"
1148)]
1149async fn data_move_builtin(
1150    from_path: Value,
1151    to_path: Value,
1152    _rest: Vec<Value>,
1153) -> BuiltinResult<Value> {
1154    let from = parse_string(&from_path, "data.move fromPath")?;
1155    let to = parse_string(&to_path, "data.move toPath")?;
1156    runmat_filesystem::rename_async(dataset_root(&from), dataset_root(&to))
1157        .await
1158        .map_err(|err| {
1159            data_error(format!(
1160                "data.move: failed to move dataset '{from}' -> '{to}': {err}"
1161            ))
1162        })?;
1163    Ok(Value::Bool(true))
1164}
1165
1166#[runtime_builtin(
1167    name = "data.import",
1168    category = "io/data",
1169    summary = "Import an existing dataset file path.",
1170    keywords = "data,dataset,import",
1171    sink = true,
1172    type_resolver(crate::builtins::io::type_resolvers::data_dataset_type),
1173    descriptor(crate::builtins::io::data::DATA_IMPORT_DESCRIPTOR),
1174    builtin_path = "crate::builtins::io::data"
1175)]
1176async fn data_import_builtin(
1177    path: Value,
1178    format: Value,
1179    source_path: Value,
1180    _rest: Vec<Value>,
1181) -> BuiltinResult<Value> {
1182    let path = parse_string(&path, "data.import path")?;
1183    let format = parse_string(&format, "data.import format")?;
1184    if !format.eq_ignore_ascii_case("data") {
1185        return Err(data_error(
1186            "data.import currently supports only format='data'",
1187        ));
1188    }
1189    let source_path = parse_string(&source_path, "data.import sourcePath")?;
1190    copy_dir_recursive(&dataset_root(&source_path), &dataset_root(&path)).await?;
1191    let manifest = read_manifest_async(&dataset_root(&path)).await?;
1192    Ok(dataset_object(&path, &manifest))
1193}
1194
1195#[runtime_builtin(
1196    name = "data.export",
1197    category = "io/data",
1198    summary = "Export dataset to target path.",
1199    keywords = "data,dataset,export",
1200    sink = true,
1201    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1202    descriptor(crate::builtins::io::data::DATA_EXPORT_DESCRIPTOR),
1203    builtin_path = "crate::builtins::io::data"
1204)]
1205async fn data_export_builtin(
1206    path: Value,
1207    format: Value,
1208    target_path: Value,
1209    _rest: Vec<Value>,
1210) -> BuiltinResult<Value> {
1211    let path = parse_string(&path, "data.export path")?;
1212    let format = parse_string(&format, "data.export format")?;
1213    if !format.eq_ignore_ascii_case("data") {
1214        return Err(data_error(
1215            "data.export currently supports only format='data'",
1216        ));
1217    }
1218    let target_path = parse_string(&target_path, "data.export targetPath")?;
1219    copy_dir_recursive(&dataset_root(&path), &dataset_root(&target_path)).await?;
1220    Ok(Value::Bool(true))
1221}
1222
1223#[runtime_builtin(
1224    name = "data.list",
1225    category = "io/data",
1226    summary = "List dataset paths under a prefix.",
1227    keywords = "data,dataset,list",
1228    type_resolver(crate::builtins::io::type_resolvers::data_cell_string_type),
1229    descriptor(crate::builtins::io::data::DATA_LIST_DESCRIPTOR),
1230    builtin_path = "crate::builtins::io::data"
1231)]
1232async fn data_list_builtin(path_prefix: Value, _rest: Vec<Value>) -> BuiltinResult<Value> {
1233    let prefix = parse_string(&path_prefix, "data.list prefix")?;
1234    let root = PathBuf::from(prefix);
1235    let entries = runmat_filesystem::read_dir_async(&root)
1236        .await
1237        .map_err(|err| {
1238            data_error(format!(
1239                "data.list: failed to read '{}': {err}",
1240                root.display()
1241            ))
1242        })?;
1243    let mut values = Vec::new();
1244    for entry in entries {
1245        if !entry.is_dir() {
1246            continue;
1247        }
1248        let candidate = entry.path();
1249        if candidate.extension().and_then(|s| s.to_str()) != Some("data") {
1250            continue;
1251        }
1252        if runmat_filesystem::metadata_async(candidate.join("manifest.json"))
1253            .await
1254            .is_ok()
1255        {
1256            values.push(Value::String(candidate.to_string_lossy().to_string()));
1257        }
1258    }
1259    let cols = values.len();
1260    make_cell(values, 1, cols).map_err(data_error)
1261}
1262
1263#[runtime_builtin(
1264    name = "data.inspect",
1265    category = "io/data",
1266    summary = "Inspect dataset metadata and schema fields.",
1267    keywords = "data,dataset,inspect,schema",
1268    type_resolver(crate::builtins::io::type_resolvers::data_struct_type),
1269    descriptor(crate::builtins::io::data::DATA_INSPECT_DESCRIPTOR),
1270    builtin_path = "crate::builtins::io::data"
1271)]
1272async fn data_inspect_builtin(path: Value) -> BuiltinResult<Value> {
1273    let path = parse_string(&path, "data.inspect path")?;
1274    let root = dataset_root(&path);
1275    let manifest = read_manifest_async(&root).await?;
1276    let mut out = StructValue::new();
1277    out.fields.insert("path".to_string(), Value::String(path));
1278    out.fields
1279        .insert("id".to_string(), Value::String(manifest.dataset_id));
1280    out.fields.insert(
1281        "arrayCount".to_string(),
1282        Value::Num(manifest.arrays.len() as f64),
1283    );
1284    out.fields
1285        .insert("updatedAt".to_string(), Value::String(manifest.updated_at));
1286    Ok(Value::Struct(out))
1287}
1288
1289#[runtime_builtin(
1290    name = "Dataset.path",
1291    category = "io/data",
1292    summary = "Return dataset path.",
1293    keywords = "dataset,path",
1294    type_resolver(crate::builtins::io::type_resolvers::data_string_type),
1295    descriptor(crate::builtins::io::data::DATASET_PATH_DESCRIPTOR),
1296    builtin_path = "crate::builtins::io::data"
1297)]
1298async fn dataset_path_builtin(base: Value) -> BuiltinResult<Value> {
1299    let obj = as_object(&base, "Dataset.path")?;
1300    Ok(get_object_prop(obj, "__data_path")?.clone())
1301}
1302
1303#[runtime_builtin(
1304    name = "Dataset.id",
1305    category = "io/data",
1306    type_resolver(crate::builtins::io::type_resolvers::data_string_type),
1307    descriptor(crate::builtins::io::data::DATASET_ID_DESCRIPTOR),
1308    builtin_path = "crate::builtins::io::data"
1309)]
1310async fn dataset_id_builtin(base: Value) -> BuiltinResult<Value> {
1311    let obj = as_object(&base, "Dataset.id")?;
1312    Ok(get_object_prop(obj, "__data_id")?.clone())
1313}
1314
1315#[runtime_builtin(
1316    name = "Dataset.version",
1317    category = "io/data",
1318    type_resolver(crate::builtins::io::type_resolvers::data_string_type),
1319    descriptor(crate::builtins::io::data::DATASET_VERSION_DESCRIPTOR),
1320    builtin_path = "crate::builtins::io::data"
1321)]
1322async fn dataset_version_builtin(base: Value) -> BuiltinResult<Value> {
1323    let obj = as_object(&base, "Dataset.version")?;
1324    Ok(get_object_prop(obj, "__data_version")?.clone())
1325}
1326
1327#[runtime_builtin(
1328    name = "Dataset.arrays",
1329    category = "io/data",
1330    type_resolver(crate::builtins::io::type_resolvers::data_cell_string_type),
1331    descriptor(crate::builtins::io::data::DATASET_ARRAYS_DESCRIPTOR),
1332    builtin_path = "crate::builtins::io::data"
1333)]
1334async fn dataset_arrays_builtin(base: Value) -> BuiltinResult<Value> {
1335    let path = dataset_path_from_object(&base, "Dataset.arrays")?;
1336    let manifest = read_manifest_async(&dataset_root(&path)).await?;
1337    let values: Vec<Value> = manifest
1338        .arrays
1339        .keys()
1340        .map(|k| Value::String(k.clone()))
1341        .collect();
1342    make_cell(values.clone(), 1, values.len()).map_err(data_error)
1343}
1344
1345#[runtime_builtin(
1346    name = "Dataset.has_array",
1347    category = "io/data",
1348    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1349    descriptor(crate::builtins::io::data::DATASET_HAS_ARRAY_DESCRIPTOR),
1350    builtin_path = "crate::builtins::io::data"
1351)]
1352async fn dataset_has_array_builtin(base: Value, name: Value) -> BuiltinResult<Value> {
1353    let path = dataset_path_from_object(&base, "Dataset.has_array")?;
1354    let name = parse_string(&name, "Dataset.has_array name")?;
1355    let manifest = read_manifest_async(&dataset_root(&path)).await?;
1356    Ok(Value::Bool(manifest.arrays.contains_key(&name)))
1357}
1358
1359#[runtime_builtin(
1360    name = "Dataset.array",
1361    category = "io/data",
1362    type_resolver(crate::builtins::io::type_resolvers::data_array_type),
1363    descriptor(crate::builtins::io::data::DATASET_ARRAY_DESCRIPTOR),
1364    builtin_path = "crate::builtins::io::data"
1365)]
1366async fn dataset_array_builtin(base: Value, name: Value) -> BuiltinResult<Value> {
1367    let path = dataset_path_from_object(&base, "Dataset.array")?;
1368    let name = parse_string(&name, "Dataset.array name")?;
1369    let manifest = read_manifest_async(&dataset_root(&path)).await?;
1370    if !manifest.arrays.contains_key(&name) {
1371        return Err(data_error(format!(
1372            "Dataset.array: array '{name}' not found"
1373        )));
1374    }
1375    Ok(array_object(&path, &name))
1376}
1377
1378#[runtime_builtin(
1379    name = "Dataset.attrs",
1380    category = "io/data",
1381    type_resolver(crate::builtins::io::type_resolvers::data_struct_type),
1382    descriptor(crate::builtins::io::data::DATASET_ATTRS_DESCRIPTOR),
1383    builtin_path = "crate::builtins::io::data"
1384)]
1385async fn dataset_attrs_builtin(base: Value) -> BuiltinResult<Value> {
1386    let path = dataset_path_from_object(&base, "Dataset.attrs")?;
1387    let manifest = read_manifest_async(&dataset_root(&path)).await?;
1388    Ok(attrs_to_struct(&manifest.attrs))
1389}
1390
1391#[runtime_builtin(
1392    name = "Dataset.get_attr",
1393    category = "io/data",
1394    type_resolver(crate::builtins::io::type_resolvers::data_unknown_type),
1395    descriptor(crate::builtins::io::data::DATASET_GET_ATTR_DESCRIPTOR),
1396    builtin_path = "crate::builtins::io::data"
1397)]
1398async fn dataset_get_attr_builtin(
1399    base: Value,
1400    key: Value,
1401    rest: Vec<Value>,
1402) -> BuiltinResult<Value> {
1403    let path = dataset_path_from_object(&base, "Dataset.get_attr")?;
1404    let key = parse_string(&key, "Dataset.get_attr key")?;
1405    let manifest = read_manifest_async(&dataset_root(&path)).await?;
1406    if let Some(value) = manifest.attrs.get(&key) {
1407        return Ok(json_to_value(value));
1408    }
1409    Ok(rest.first().cloned().unwrap_or(Value::Num(0.0)))
1410}
1411
1412#[runtime_builtin(
1413    name = "Dataset.set_attr",
1414    category = "io/data",
1415    sink = true,
1416    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1417    descriptor(crate::builtins::io::data::DATASET_SET_ATTR_DESCRIPTOR),
1418    builtin_path = "crate::builtins::io::data"
1419)]
1420async fn dataset_set_attr_builtin(base: Value, key: Value, value: Value) -> BuiltinResult<Value> {
1421    let path = dataset_path_from_object(&base, "Dataset.set_attr")?;
1422    let key = parse_string(&key, "Dataset.set_attr key")?;
1423    let root = dataset_root(&path);
1424    let mut manifest = read_manifest_async(&root).await?;
1425    manifest.attrs.insert(key, value_to_json(&value));
1426    manifest.updated_at = now_rfc3339();
1427    manifest.txn_sequence = manifest.txn_sequence.saturating_add(1);
1428    write_manifest_async(&root, &manifest).await?;
1429    Ok(Value::Bool(true))
1430}
1431
1432#[runtime_builtin(
1433    name = "Dataset.set_attrs",
1434    category = "io/data",
1435    sink = true,
1436    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1437    descriptor(crate::builtins::io::data::DATASET_SET_ATTRS_DESCRIPTOR),
1438    builtin_path = "crate::builtins::io::data"
1439)]
1440async fn dataset_set_attrs_builtin(base: Value, attrs: Value) -> BuiltinResult<Value> {
1441    let path = dataset_path_from_object(&base, "Dataset.set_attrs")?;
1442    let Value::Struct(incoming) = attrs else {
1443        return Err(data_error("Dataset.set_attrs: attrs must be a struct"));
1444    };
1445    let root = dataset_root(&path);
1446    let mut manifest = read_manifest_async(&root).await?;
1447    for (k, v) in incoming.fields {
1448        manifest.attrs.insert(k, value_to_json(&v));
1449    }
1450    manifest.updated_at = now_rfc3339();
1451    manifest.txn_sequence = manifest.txn_sequence.saturating_add(1);
1452    write_manifest_async(&root, &manifest).await?;
1453    Ok(Value::Bool(true))
1454}
1455
1456#[runtime_builtin(
1457    name = "Dataset.begin",
1458    category = "io/data",
1459    type_resolver(crate::builtins::io::type_resolvers::data_tx_type),
1460    descriptor(crate::builtins::io::data::DATASET_BEGIN_DESCRIPTOR),
1461    builtin_path = "crate::builtins::io::data"
1462)]
1463async fn dataset_begin_builtin(base: Value, _rest: Vec<Value>) -> BuiltinResult<Value> {
1464    let path = dataset_path_from_object(&base, "Dataset.begin")?;
1465    let manifest = read_manifest_async(&dataset_root(&path)).await?;
1466    let tx_id = start_tx(path.clone(), manifest.txn_sequence);
1467    tracing::info!(
1468        target: "runmat.data",
1469        dataset = path,
1470        tx_id = tx_id,
1471        base_sequence = manifest.txn_sequence,
1472        "data transaction begin"
1473    );
1474    Ok(transaction_object(&path, &tx_id))
1475}
1476
1477#[runtime_builtin(
1478    name = "Dataset.snapshot",
1479    category = "io/data",
1480    type_resolver(crate::builtins::io::type_resolvers::data_string_type),
1481    descriptor(crate::builtins::io::data::DATASET_SNAPSHOT_DESCRIPTOR),
1482    builtin_path = "crate::builtins::io::data"
1483)]
1484async fn dataset_snapshot_builtin(
1485    base: Value,
1486    label: Value,
1487    _rest: Vec<Value>,
1488) -> BuiltinResult<Value> {
1489    let path = dataset_path_from_object(&base, "Dataset.snapshot")?;
1490    let label = parse_string(&label, "Dataset.snapshot label")?;
1491    let root = dataset_root(&path);
1492    let snapshots = root.join(".snapshots");
1493    runmat_filesystem::create_dir_all_async(&snapshots)
1494        .await
1495        .map_err(|err| {
1496            data_error(format!(
1497                "Dataset.snapshot: failed to create snapshots dir: {err}"
1498            ))
1499        })?;
1500    let src = manifest_path(&root);
1501    let dst = snapshots.join(format!("{}.manifest.json", sanitize_label(&label)));
1502    copy_file(&src, &dst).await?;
1503    Ok(Value::String(dst.to_string_lossy().to_string()))
1504}
1505
1506#[runtime_builtin(
1507    name = "Dataset.refresh",
1508    category = "io/data",
1509    type_resolver(crate::builtins::io::type_resolvers::data_dataset_type),
1510    descriptor(crate::builtins::io::data::DATASET_REFRESH_DESCRIPTOR),
1511    builtin_path = "crate::builtins::io::data"
1512)]
1513async fn dataset_refresh_builtin(base: Value) -> BuiltinResult<Value> {
1514    let path = dataset_path_from_object(&base, "Dataset.refresh")?;
1515    let manifest = read_manifest_async(&dataset_root(&path)).await?;
1516    let mut ds = dataset_object(&path, &manifest);
1517    hydrate_dataset_descriptor_async(&path, &mut ds).await;
1518    Ok(ds)
1519}
1520
1521#[runtime_builtin(
1522    name = "DataArray.name",
1523    category = "io/data",
1524    type_resolver(crate::builtins::io::type_resolvers::data_string_type),
1525    descriptor(crate::builtins::io::data::DATAARRAY_NAME_DESCRIPTOR),
1526    builtin_path = "crate::builtins::io::data"
1527)]
1528async fn data_array_name_builtin(base: Value) -> BuiltinResult<Value> {
1529    let obj = as_object(&base, "DataArray.name")?;
1530    Ok(get_object_prop(obj, "__array_name")?.clone())
1531}
1532
1533#[runtime_builtin(
1534    name = "DataArray.dtype",
1535    category = "io/data",
1536    type_resolver(crate::builtins::io::type_resolvers::data_string_type),
1537    descriptor(crate::builtins::io::data::DATAARRAY_DTYPE_DESCRIPTOR),
1538    builtin_path = "crate::builtins::io::data"
1539)]
1540async fn data_array_dtype_builtin(base: Value) -> BuiltinResult<Value> {
1541    let (path, name) = array_identity(&base, "DataArray.dtype")?;
1542    let manifest = read_manifest_async(&dataset_root(&path)).await?;
1543    let meta = manifest
1544        .arrays
1545        .get(&name)
1546        .ok_or_else(|| data_error(format!("DataArray.dtype: array '{name}' not found")))?;
1547    Ok(Value::String(meta.dtype.clone()))
1548}
1549
1550#[runtime_builtin(
1551    name = "DataArray.shape",
1552    category = "io/data",
1553    type_resolver(crate::builtins::io::type_resolvers::data_shape_tensor_type),
1554    descriptor(crate::builtins::io::data::DATAARRAY_SHAPE_DESCRIPTOR),
1555    builtin_path = "crate::builtins::io::data"
1556)]
1557async fn data_array_shape_builtin(base: Value) -> BuiltinResult<Value> {
1558    let (path, name) = array_identity(&base, "DataArray.shape")?;
1559    let manifest = read_manifest_async(&dataset_root(&path)).await?;
1560    let meta = manifest
1561        .arrays
1562        .get(&name)
1563        .ok_or_else(|| data_error(format!("DataArray.shape: array '{name}' not found")))?;
1564    let values = meta.shape.iter().map(|v| *v as f64).collect::<Vec<_>>();
1565    let tensor = Tensor::new(values, vec![1, meta.shape.len()])
1566        .map_err(|err| data_error(format!("DataArray.shape: {err}")))?;
1567    Ok(Value::Tensor(tensor))
1568}
1569
1570#[runtime_builtin(
1571    name = "DataArray.rank",
1572    category = "io/data",
1573    type_resolver(crate::builtins::io::type_resolvers::data_int_type),
1574    descriptor(crate::builtins::io::data::DATAARRAY_RANK_DESCRIPTOR),
1575    builtin_path = "crate::builtins::io::data"
1576)]
1577async fn data_array_rank_builtin(base: Value) -> BuiltinResult<Value> {
1578    let (path, name) = array_identity(&base, "DataArray.rank")?;
1579    let manifest = read_manifest_async(&dataset_root(&path)).await?;
1580    let meta = manifest
1581        .arrays
1582        .get(&name)
1583        .ok_or_else(|| data_error(format!("DataArray.rank: array '{name}' not found")))?;
1584    Ok(Value::Num(meta.shape.len() as f64))
1585}
1586
1587#[runtime_builtin(
1588    name = "DataArray.chunk_shape",
1589    category = "io/data",
1590    type_resolver(crate::builtins::io::type_resolvers::data_shape_tensor_type),
1591    descriptor(crate::builtins::io::data::DATAARRAY_CHUNK_SHAPE_DESCRIPTOR),
1592    builtin_path = "crate::builtins::io::data"
1593)]
1594async fn data_array_chunk_shape_builtin(base: Value) -> BuiltinResult<Value> {
1595    let (path, name) = array_identity(&base, "DataArray.chunk_shape")?;
1596    let manifest = read_manifest_async(&dataset_root(&path)).await?;
1597    let meta = manifest
1598        .arrays
1599        .get(&name)
1600        .ok_or_else(|| data_error(format!("DataArray.chunk_shape: array '{name}' not found")))?;
1601    let values = meta
1602        .chunk_shape
1603        .iter()
1604        .map(|v| *v as f64)
1605        .collect::<Vec<_>>();
1606    let tensor = Tensor::new(values, vec![1, meta.chunk_shape.len()])
1607        .map_err(|err| data_error(format!("DataArray.chunk_shape: {err}")))?;
1608    Ok(Value::Tensor(tensor))
1609}
1610
1611#[runtime_builtin(
1612    name = "DataArray.codec",
1613    category = "io/data",
1614    type_resolver(crate::builtins::io::type_resolvers::data_string_type),
1615    descriptor(crate::builtins::io::data::DATAARRAY_CODEC_DESCRIPTOR),
1616    builtin_path = "crate::builtins::io::data"
1617)]
1618async fn data_array_codec_builtin(base: Value) -> BuiltinResult<Value> {
1619    let (path, name) = array_identity(&base, "DataArray.codec")?;
1620    let manifest = read_manifest_async(&dataset_root(&path)).await?;
1621    let meta = manifest
1622        .arrays
1623        .get(&name)
1624        .ok_or_else(|| data_error(format!("DataArray.codec: array '{name}' not found")))?;
1625    Ok(Value::String(meta.codec.clone()))
1626}
1627
1628#[runtime_builtin(
1629    name = "DataArray.read",
1630    category = "io/data",
1631    type_resolver(crate::builtins::io::type_resolvers::data_tensor_type),
1632    descriptor(crate::builtins::io::data::DATAARRAY_READ_DESCRIPTOR),
1633    builtin_path = "crate::builtins::io::data"
1634)]
1635async fn data_array_read_builtin(base: Value, rest: Vec<Value>) -> BuiltinResult<Value> {
1636    let (path, name) = array_identity(&base, "DataArray.read")?;
1637    let root = dataset_root(&path);
1638    let manifest = read_manifest_async(&root).await?;
1639    let meta = manifest
1640        .arrays
1641        .get(&name)
1642        .ok_or_else(|| data_error(format!("DataArray.read: array '{name}' not found")))?;
1643    let payload = read_array_payload_async(&root, meta).await?;
1644    let sliced = if let Some(slice_spec) = rest.first() {
1645        read_slice_payload(&payload, slice_spec)?
1646    } else {
1647        payload
1648    };
1649    let tensor = Tensor::new(sliced.values, sliced.shape)
1650        .map_err(|err| data_error(format!("DataArray.read: {err}")))?;
1651    Ok(Value::Tensor(tensor))
1652}
1653
1654#[runtime_builtin(
1655    name = "DataArray.write",
1656    category = "io/data",
1657    sink = true,
1658    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1659    descriptor(crate::builtins::io::data::DATAARRAY_WRITE_DESCRIPTOR),
1660    builtin_path = "crate::builtins::io::data"
1661)]
1662async fn data_array_write_builtin(base: Value, rest: Vec<Value>) -> BuiltinResult<Value> {
1663    let (path, name) = array_identity(&base, "DataArray.write")?;
1664    let (slice_spec, value) = match rest.as_slice() {
1665        [v] => (None, v),
1666        [slice, v] => (Some(slice), v),
1667        _ => {
1668            return Err(data_error(
1669                "DataArray.write expects values or (sliceSpec, values) arguments",
1670            ))
1671        }
1672    };
1673    write_array_full_async(&path, &name, slice_spec, value).await?;
1674    Ok(Value::Bool(true))
1675}
1676
1677#[runtime_builtin(
1678    name = "DataArray.resize",
1679    category = "io/data",
1680    sink = true,
1681    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1682    descriptor(crate::builtins::io::data::DATAARRAY_RESIZE_DESCRIPTOR),
1683    builtin_path = "crate::builtins::io::data"
1684)]
1685async fn data_array_resize_builtin(
1686    base: Value,
1687    new_shape: Value,
1688    _rest: Vec<Value>,
1689) -> BuiltinResult<Value> {
1690    let (path, name) = array_identity(&base, "DataArray.resize")?;
1691    let shape = parse_shape_from_value(&new_shape)?;
1692    let root = dataset_root(&path);
1693    let mut manifest = read_manifest_async(&root).await?;
1694    let meta = manifest
1695        .arrays
1696        .get_mut(&name)
1697        .ok_or_else(|| data_error(format!("DataArray.resize: array '{name}' not found")))?;
1698    meta.shape = shape.clone();
1699    let payload = DataArrayPayload {
1700        dtype: meta.dtype.clone(),
1701        shape: shape.clone(),
1702        values: vec![0.0; shape.iter().copied().product()],
1703    };
1704    let (payload_path, chunk_index_path) =
1705        write_array_payload_async(&root, &name, &payload, &meta.chunk_shape).await?;
1706    meta.data_path = make_rel_data_path(&root, &payload_path)?;
1707    meta.chunk_index_path = Some(make_rel_data_path(&root, &chunk_index_path)?);
1708    manifest.updated_at = now_rfc3339();
1709    manifest.txn_sequence = manifest.txn_sequence.saturating_add(1);
1710    write_manifest_async(&root, &manifest).await?;
1711    Ok(Value::Bool(true))
1712}
1713
1714#[runtime_builtin(
1715    name = "DataArray.fill",
1716    category = "io/data",
1717    sink = true,
1718    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1719    descriptor(crate::builtins::io::data::DATAARRAY_FILL_DESCRIPTOR),
1720    builtin_path = "crate::builtins::io::data"
1721)]
1722async fn data_array_fill_builtin(
1723    base: Value,
1724    value: Value,
1725    _rest: Vec<Value>,
1726) -> BuiltinResult<Value> {
1727    let (path, name) = array_identity(&base, "DataArray.fill")?;
1728    let root = dataset_root(&path);
1729    let mut manifest = read_manifest_async(&root).await?;
1730    let meta = manifest
1731        .arrays
1732        .get_mut(&name)
1733        .ok_or_else(|| data_error(format!("DataArray.fill: array '{name}' not found")))?;
1734    let scalar = scalar_to_f64(&value)?;
1735    let payload = DataArrayPayload {
1736        dtype: meta.dtype.clone(),
1737        shape: meta.shape.clone(),
1738        values: vec![scalar; meta.shape.iter().copied().product()],
1739    };
1740    let (payload_path, chunk_index_path) =
1741        write_array_payload_async(&root, &name, &payload, &meta.chunk_shape).await?;
1742    meta.data_path = make_rel_data_path(&root, &payload_path)?;
1743    meta.chunk_index_path = Some(make_rel_data_path(&root, &chunk_index_path)?);
1744    manifest.updated_at = now_rfc3339();
1745    manifest.txn_sequence = manifest.txn_sequence.saturating_add(1);
1746    write_manifest_async(&root, &manifest).await?;
1747    Ok(Value::Bool(true))
1748}
1749
1750#[runtime_builtin(
1751    name = "DataTransaction.id",
1752    category = "io/data",
1753    type_resolver(crate::builtins::io::type_resolvers::data_string_type),
1754    descriptor(crate::builtins::io::data::DATATX_ID_DESCRIPTOR),
1755    builtin_path = "crate::builtins::io::data"
1756)]
1757async fn data_tx_id_builtin(base: Value) -> BuiltinResult<Value> {
1758    let obj = as_object(&base, "DataTransaction.id")?;
1759    Ok(get_object_prop(obj, "__tx_id")?.clone())
1760}
1761
1762#[runtime_builtin(
1763    name = "DataTransaction.write",
1764    category = "io/data",
1765    sink = true,
1766    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1767    descriptor(crate::builtins::io::data::DATATX_WRITE_DESCRIPTOR),
1768    builtin_path = "crate::builtins::io::data"
1769)]
1770async fn data_tx_write_builtin(
1771    base: Value,
1772    array_name: Value,
1773    slice: Value,
1774    values: Value,
1775    _rest: Vec<Value>,
1776) -> BuiltinResult<Value> {
1777    let tx_id = tx_id_from_object(&base, "DataTransaction.write")?;
1778    let array_name = parse_string(&array_name, "DataTransaction.write arrayName")?;
1779    with_tx_mut(&tx_id, |tx| {
1780        if tx.status != TxnStatus::Open {
1781            return Err(data_error("DataTransaction.write: transaction is not open"));
1782        }
1783        tx.writes.push(PendingWrite {
1784            array: array_name,
1785            slice_spec: Some(slice),
1786            value: values,
1787        });
1788        Ok(())
1789    })?;
1790    Ok(Value::Bool(true))
1791}
1792
1793#[runtime_builtin(
1794    name = "DataTransaction.set_attr",
1795    category = "io/data",
1796    sink = true,
1797    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1798    descriptor(crate::builtins::io::data::DATATX_SET_ATTR_DESCRIPTOR),
1799    builtin_path = "crate::builtins::io::data"
1800)]
1801async fn data_tx_set_attr_builtin(base: Value, key: Value, value: Value) -> BuiltinResult<Value> {
1802    let tx_id = tx_id_from_object(&base, "DataTransaction.set_attr")?;
1803    let key = parse_string(&key, "DataTransaction.set_attr key")?;
1804    with_tx_mut(&tx_id, |tx| {
1805        if tx.status != TxnStatus::Open {
1806            return Err(data_error(
1807                "DataTransaction.set_attr: transaction is not open",
1808            ));
1809        }
1810        tx.attrs.insert(key, value);
1811        Ok(())
1812    })?;
1813    Ok(Value::Bool(true))
1814}
1815
1816#[runtime_builtin(
1817    name = "DataTransaction.set_attrs",
1818    category = "io/data",
1819    sink = true,
1820    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1821    descriptor(crate::builtins::io::data::DATATX_SET_ATTRS_DESCRIPTOR),
1822    builtin_path = "crate::builtins::io::data"
1823)]
1824async fn data_tx_set_attrs_builtin(base: Value, attrs: Value) -> BuiltinResult<Value> {
1825    let tx_id = tx_id_from_object(&base, "DataTransaction.set_attrs")?;
1826    let Value::Struct(incoming) = attrs else {
1827        return Err(data_error(
1828            "DataTransaction.set_attrs: attrs must be struct",
1829        ));
1830    };
1831    with_tx_mut(&tx_id, |tx| {
1832        if tx.status != TxnStatus::Open {
1833            return Err(data_error(
1834                "DataTransaction.set_attrs: transaction is not open",
1835            ));
1836        }
1837        for (k, v) in incoming.fields {
1838            tx.attrs.insert(k, v);
1839        }
1840        Ok(())
1841    })?;
1842    Ok(Value::Bool(true))
1843}
1844
1845#[runtime_builtin(
1846    name = "DataTransaction.resize",
1847    category = "io/data",
1848    sink = true,
1849    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1850    descriptor(crate::builtins::io::data::DATATX_RESIZE_DESCRIPTOR),
1851    builtin_path = "crate::builtins::io::data"
1852)]
1853async fn data_tx_resize_builtin(
1854    base: Value,
1855    array_name: Value,
1856    new_shape: Value,
1857    _rest: Vec<Value>,
1858) -> BuiltinResult<Value> {
1859    let tx_id = tx_id_from_object(&base, "DataTransaction.resize")?;
1860    let array_name = parse_string(&array_name, "DataTransaction.resize arrayName")?;
1861    let shape = parse_shape_from_value(&new_shape)?;
1862    with_tx_mut(&tx_id, |tx| {
1863        if tx.status != TxnStatus::Open {
1864            return Err(data_error(
1865                "DataTransaction.resize: transaction is not open",
1866            ));
1867        }
1868        tx.resizes.push(PendingResize {
1869            array: array_name,
1870            shape,
1871        });
1872        Ok(())
1873    })?;
1874    Ok(Value::Bool(true))
1875}
1876
1877#[runtime_builtin(
1878    name = "DataTransaction.fill",
1879    category = "io/data",
1880    sink = true,
1881    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1882    descriptor(crate::builtins::io::data::DATATX_FILL_DESCRIPTOR),
1883    builtin_path = "crate::builtins::io::data"
1884)]
1885async fn data_tx_fill_builtin(
1886    base: Value,
1887    array_name: Value,
1888    value: Value,
1889    rest: Vec<Value>,
1890) -> BuiltinResult<Value> {
1891    let tx_id = tx_id_from_object(&base, "DataTransaction.fill")?;
1892    let array_name = parse_string(&array_name, "DataTransaction.fill arrayName")?;
1893    let slice_spec = rest.first().cloned();
1894    with_tx_mut(&tx_id, |tx| {
1895        if tx.status != TxnStatus::Open {
1896            return Err(data_error("DataTransaction.fill: transaction is not open"));
1897        }
1898        tx.fills.push(PendingFill {
1899            array: array_name,
1900            slice_spec,
1901            value,
1902        });
1903        Ok(())
1904    })?;
1905    Ok(Value::Bool(true))
1906}
1907
1908#[runtime_builtin(
1909    name = "DataTransaction.delete_array",
1910    category = "io/data",
1911    sink = true,
1912    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1913    descriptor(crate::builtins::io::data::DATATX_DELETE_ARRAY_DESCRIPTOR),
1914    builtin_path = "crate::builtins::io::data"
1915)]
1916async fn data_tx_delete_array_builtin(base: Value, array_name: Value) -> BuiltinResult<Value> {
1917    let tx_id = tx_id_from_object(&base, "DataTransaction.delete_array")?;
1918    let array_name = parse_string(&array_name, "DataTransaction.delete_array arrayName")?;
1919    with_tx_mut(&tx_id, |tx| {
1920        if tx.status != TxnStatus::Open {
1921            return Err(data_error(
1922                "DataTransaction.delete_array: transaction is not open",
1923            ));
1924        }
1925        tx.delete_arrays.push(array_name);
1926        Ok(())
1927    })?;
1928    Ok(Value::Bool(true))
1929}
1930
1931#[runtime_builtin(
1932    name = "DataTransaction.create_array",
1933    category = "io/data",
1934    sink = true,
1935    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1936    descriptor(crate::builtins::io::data::DATATX_CREATE_ARRAY_DESCRIPTOR),
1937    builtin_path = "crate::builtins::io::data"
1938)]
1939async fn data_tx_create_array_builtin(
1940    base: Value,
1941    array_name: Value,
1942    meta: Value,
1943) -> BuiltinResult<Value> {
1944    let tx_id = tx_id_from_object(&base, "DataTransaction.create_array")?;
1945    let array_name = parse_string(&array_name, "DataTransaction.create_array arrayName")?;
1946    let meta = parse_array_meta(&array_name, &meta)?;
1947    with_tx_mut(&tx_id, |tx| {
1948        if tx.status != TxnStatus::Open {
1949            return Err(data_error(
1950                "DataTransaction.create_array: transaction is not open",
1951            ));
1952        }
1953        tx.create_arrays.push(PendingCreateArray {
1954            array: array_name,
1955            meta,
1956        });
1957        Ok(())
1958    })?;
1959    Ok(Value::Bool(true))
1960}
1961
1962#[runtime_builtin(
1963    name = "DataTransaction.commit",
1964    category = "io/data",
1965    sink = true,
1966    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1967    descriptor(crate::builtins::io::data::DATATX_COMMIT_DESCRIPTOR),
1968    builtin_path = "crate::builtins::io::data"
1969)]
1970async fn data_tx_commit_builtin(base: Value, rest: Vec<Value>) -> BuiltinResult<Value> {
1971    let tx_id = tx_id_from_object(&base, "DataTransaction.commit")?;
1972    let (dataset_path, base_sequence, writes, resizes, fills, create_arrays, delete_arrays, attrs) =
1973        with_tx(&tx_id, |tx| {
1974            if tx.status != TxnStatus::Open {
1975                return Err(data_error(
1976                    "DataTransaction.commit: transaction is not open",
1977                ));
1978            }
1979            Ok((
1980                tx.dataset_path.clone(),
1981                tx.base_sequence,
1982                tx.writes.clone(),
1983                tx.resizes.clone(),
1984                tx.fills.clone(),
1985                tx.create_arrays.clone(),
1986                tx.delete_arrays.clone(),
1987                tx.attrs.clone(),
1988            ))
1989        })?;
1990
1991    let write_ops = writes.len();
1992    let resize_ops = resizes.len();
1993    let fill_ops = fills.len();
1994    let create_ops = create_arrays.len();
1995    let delete_ops = delete_arrays.len();
1996    let attr_updates = attrs.len();
1997
1998    let root = dataset_root(&dataset_path);
1999    let mut manifest = read_manifest_async(&root).await?;
2000    ensure_manifest_sequence(base_sequence, &manifest)?;
2001    if let Some(Value::Struct(options)) = rest.first() {
2002        if let Some(expected) = options.fields.get("if_manifest") {
2003            let expected = parse_string(expected, "DataTransaction.commit if_manifest")?;
2004            let actual = manifest_version_token(&manifest);
2005            if expected != actual {
2006                tracing::warn!(
2007                    target: "runmat.data",
2008                    tx_id = tx_id,
2009                    expected_manifest = expected,
2010                    actual_manifest = actual,
2011                    "data transaction manifest conflict"
2012                );
2013                return Err(data_error(
2014                    "MANIFEST_CONFLICT: if_manifest precondition failed",
2015                ));
2016            }
2017        }
2018    }
2019    for create in create_arrays {
2020        create_array_in_manifest(&root, &mut manifest, &create.array, create.meta).await?;
2021    }
2022    for resize in resizes {
2023        resize_array_in_manifest(&root, &mut manifest, &resize.array, resize.shape).await?;
2024    }
2025    for fill in fills {
2026        fill_array_in_manifest(
2027            &root,
2028            &mut manifest,
2029            &fill.array,
2030            fill.slice_spec.as_ref(),
2031            &fill.value,
2032        )
2033        .await?;
2034    }
2035    for write in writes {
2036        apply_write_to_manifest_async(
2037            &root,
2038            &mut manifest,
2039            &write.array,
2040            write.slice_spec.as_ref(),
2041            &write.value,
2042        )
2043        .await?;
2044    }
2045    for array_name in delete_arrays {
2046        delete_array_in_manifest_async(&root, &mut manifest, &array_name).await?;
2047    }
2048    for (k, v) in attrs {
2049        manifest.attrs.insert(k, value_to_json(&v));
2050    }
2051    manifest.updated_at = now_rfc3339();
2052    manifest.txn_sequence = manifest.txn_sequence.saturating_add(1);
2053    write_manifest_async(&root, &manifest).await?;
2054    with_tx_mut(&tx_id, |tx| {
2055        tx.status = TxnStatus::Committed;
2056        Ok(())
2057    })?;
2058    tracing::info!(
2059        target: "runmat.data",
2060        dataset = dataset_path,
2061        tx_id = tx_id,
2062        write_ops = write_ops,
2063        resize_ops = resize_ops,
2064        fill_ops = fill_ops,
2065        create_ops = create_ops,
2066        delete_ops = delete_ops,
2067        attr_updates = attr_updates,
2068        next_sequence = manifest.txn_sequence,
2069        "data transaction commit"
2070    );
2071    remove_tx(&tx_id);
2072    Ok(Value::Bool(true))
2073}
2074
2075#[runtime_builtin(
2076    name = "commit",
2077    category = "io/data",
2078    sink = true,
2079    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
2080    descriptor(crate::builtins::io::data::COMMIT_ALIAS_DESCRIPTOR),
2081    builtin_path = "crate::builtins::io::data"
2082)]
2083async fn data_tx_commit_alias_builtin(base: Value, rest: Vec<Value>) -> BuiltinResult<Value> {
2084    match &base {
2085        Value::Object(obj) if obj.class_name == "DataTransaction" => {
2086            data_tx_commit_builtin(base, rest).await
2087        }
2088        Value::HandleObject(handle) if handle.class_name == "DataTransaction" => {
2089            data_tx_commit_builtin(base, rest).await
2090        }
2091        _ => Err(data_error(
2092            "commit: receiver must be a DataTransaction (use tx = ds.begin())",
2093        )),
2094    }
2095}
2096
2097#[runtime_builtin(
2098    name = "DataTransaction.abort",
2099    category = "io/data",
2100    sink = true,
2101    type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
2102    descriptor(crate::builtins::io::data::DATATX_ABORT_DESCRIPTOR),
2103    builtin_path = "crate::builtins::io::data"
2104)]
2105async fn data_tx_abort_builtin(base: Value) -> BuiltinResult<Value> {
2106    let tx_id = tx_id_from_object(&base, "DataTransaction.abort")?;
2107    with_tx_mut(&tx_id, |tx| {
2108        tx.status = TxnStatus::Aborted;
2109        Ok(())
2110    })?;
2111    tracing::info!(
2112        target: "runmat.data",
2113        tx_id = tx_id,
2114        "data transaction abort"
2115    );
2116    remove_tx(&tx_id);
2117    Ok(Value::Bool(true))
2118}
2119
2120#[runtime_builtin(
2121    name = "DataTransaction.status",
2122    category = "io/data",
2123    type_resolver(crate::builtins::io::type_resolvers::data_string_type),
2124    descriptor(crate::builtins::io::data::DATATX_STATUS_DESCRIPTOR),
2125    builtin_path = "crate::builtins::io::data"
2126)]
2127async fn data_tx_status_builtin(base: Value) -> BuiltinResult<Value> {
2128    let tx_id = tx_id_from_object(&base, "DataTransaction.status")?;
2129    with_tx(&tx_id, |tx| {
2130        let status = match tx.status {
2131            TxnStatus::Open => "open",
2132            TxnStatus::Committed => "committed",
2133            TxnStatus::Aborted => "aborted",
2134        };
2135        Ok(Value::String(status.to_string()))
2136    })
2137}
2138
2139fn dataset_path_from_object(base: &Value, context: &str) -> BuiltinResult<String> {
2140    let obj = as_object(base, context)?;
2141    parse_string(get_object_prop(obj, "__data_path")?, context)
2142}
2143
2144fn tx_id_from_object(base: &Value, context: &str) -> BuiltinResult<String> {
2145    let obj = as_object(base, context)?;
2146    parse_string(get_object_prop(obj, "__tx_id")?, context)
2147}
2148
2149fn array_identity(base: &Value, context: &str) -> BuiltinResult<(String, String)> {
2150    let obj = as_object(base, context)?;
2151    let path = parse_string(get_object_prop(obj, "__data_path")?, context)?;
2152    let name = parse_string(get_object_prop(obj, "__array_name")?, context)?;
2153    Ok((path, name))
2154}
2155
2156fn as_object<'a>(value: &'a Value, context: &str) -> BuiltinResult<&'a ObjectInstance> {
2157    match value {
2158        Value::Object(obj) => Ok(obj),
2159        _ => Err(data_error(format!("{context}: expected object receiver"))),
2160    }
2161}
2162
2163async fn hydrate_dataset_descriptor_async(path: &str, dataset: &mut Value) {
2164    let request = runmat_filesystem::data_contract::DataManifestRequest {
2165        path: path.to_string(),
2166        version: None,
2167    };
2168    let descriptor = match runmat_filesystem::data_manifest_descriptor_async(&request).await {
2169        Ok(descriptor) => descriptor,
2170        Err(_) => return,
2171    };
2172    let Value::Object(obj) = dataset else {
2173        return;
2174    };
2175    if !descriptor.dataset_id.is_empty() {
2176        obj.properties.insert(
2177            "__data_id".to_string(),
2178            Value::String(descriptor.dataset_id),
2179        );
2180    }
2181    obj.properties.insert(
2182        "__data_version".to_string(),
2183        Value::String(format!(
2184            "{}:{}",
2185            descriptor.updated_at, descriptor.txn_sequence
2186        )),
2187    );
2188}
2189
2190fn sanitize_label(label: &str) -> String {
2191    label
2192        .chars()
2193        .map(|ch| {
2194            if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
2195                ch
2196            } else {
2197                '_'
2198            }
2199        })
2200        .collect()
2201}
2202
2203async fn copy_file(src: &PathBuf, dst: &PathBuf) -> BuiltinResult<()> {
2204    let bytes = runmat_filesystem::read_async(src)
2205        .await
2206        .map_err(|err| data_error(format!("failed to open '{}': {err}", src.display())))?;
2207    let parent = dst.parent().ok_or_else(|| {
2208        data_error(format!(
2209            "invalid destination path '{}': missing parent",
2210            dst.display()
2211        ))
2212    })?;
2213    runmat_filesystem::create_dir_all_async(parent)
2214        .await
2215        .map_err(|err| data_error(format!("failed to create '{}': {err}", parent.display())))?;
2216    runmat_filesystem::write_async(dst, &bytes)
2217        .await
2218        .map_err(|err| {
2219            data_error(format!(
2220                "failed to copy '{}' -> '{}': {err}",
2221                src.display(),
2222                dst.display()
2223            ))
2224        })?;
2225    Ok(())
2226}
2227
2228fn make_rel_data_path(
2229    root: &std::path::Path,
2230    payload_path: &std::path::Path,
2231) -> BuiltinResult<String> {
2232    let rel = payload_path
2233        .strip_prefix(root)
2234        .map_err(|err| data_error(format!("failed to compute relative data path: {err}")))?;
2235    Ok(rel.to_string_lossy().to_string())
2236}
2237
2238async fn create_array_in_manifest(
2239    root: &std::path::Path,
2240    manifest: &mut DataManifest,
2241    array_name: &str,
2242    mut meta: DataArrayMeta,
2243) -> BuiltinResult<()> {
2244    if manifest.arrays.contains_key(array_name) {
2245        return Err(data_error(format!(
2246            "DataTransaction.create_array: array '{array_name}' already exists"
2247        )));
2248    }
2249    let payload = DataArrayPayload {
2250        dtype: meta.dtype.clone(),
2251        shape: meta.shape.clone(),
2252        values: vec![0.0; meta.shape.iter().copied().product()],
2253    };
2254    let (payload_path, chunk_index_path) =
2255        write_array_payload_async(root, array_name, &payload, &meta.chunk_shape).await?;
2256    meta.data_path = make_rel_data_path(root, &payload_path)?;
2257    meta.chunk_index_path = Some(make_rel_data_path(root, &chunk_index_path)?);
2258    manifest.arrays.insert(array_name.to_string(), meta);
2259    Ok(())
2260}
2261
2262async fn resize_array_in_manifest(
2263    root: &std::path::Path,
2264    manifest: &mut DataManifest,
2265    array_name: &str,
2266    shape: Vec<usize>,
2267) -> BuiltinResult<()> {
2268    let meta = manifest
2269        .arrays
2270        .get_mut(array_name)
2271        .ok_or_else(|| data_error(format!("array '{array_name}' not found")))?;
2272    meta.shape = shape.clone();
2273    let payload = DataArrayPayload {
2274        dtype: meta.dtype.clone(),
2275        shape: shape.clone(),
2276        values: vec![0.0; shape.iter().copied().product()],
2277    };
2278    let (payload_path, chunk_index_path) =
2279        write_array_payload_async(root, array_name, &payload, &meta.chunk_shape).await?;
2280    meta.data_path = make_rel_data_path(root, &payload_path)?;
2281    meta.chunk_index_path = Some(make_rel_data_path(root, &chunk_index_path)?);
2282    Ok(())
2283}
2284
2285async fn fill_array_in_manifest(
2286    root: &std::path::Path,
2287    manifest: &mut DataManifest,
2288    array_name: &str,
2289    slice_spec: Option<&Value>,
2290    value: &Value,
2291) -> BuiltinResult<()> {
2292    let meta: DataArrayMeta = manifest
2293        .arrays
2294        .get(array_name)
2295        .cloned()
2296        .ok_or_else(|| data_error(format!("array '{array_name}' not found")))?;
2297    let scalar = scalar_to_f64(value)?;
2298    let payload = read_array_payload_async(root, &meta).await?;
2299    let next_payload = if let Some(slice_spec) = slice_spec {
2300        let ranges = parse_slice_spec(slice_spec, &payload.shape)?;
2301        let target_shape: Vec<usize> = ranges
2302            .iter()
2303            .map(|r| r.end.saturating_sub(r.start))
2304            .collect();
2305        let rhs = Value::Tensor(
2306            Tensor::new(
2307                vec![scalar; target_shape.iter().copied().product()],
2308                target_shape,
2309            )
2310            .map_err(|err| data_error(format!("DataTransaction.fill: {err}")))?,
2311        );
2312        write_slice_payload(&payload, slice_spec, &rhs)?
2313    } else {
2314        DataArrayPayload {
2315            dtype: payload.dtype,
2316            shape: payload.shape.clone(),
2317            values: vec![scalar; payload.shape.iter().copied().product()],
2318        }
2319    };
2320    let (payload_path, chunk_index_path) =
2321        write_array_payload_async(root, array_name, &next_payload, &meta.chunk_shape).await?;
2322    if let Some(updated) = manifest.arrays.get_mut(array_name) {
2323        updated.shape = next_payload.shape.clone();
2324        updated.data_path = make_rel_data_path(root, &payload_path)?;
2325        updated.chunk_index_path = Some(make_rel_data_path(root, &chunk_index_path)?);
2326    }
2327    Ok(())
2328}
2329
2330async fn delete_array_in_manifest_async(
2331    root: &std::path::Path,
2332    manifest: &mut DataManifest,
2333    array_name: &str,
2334) -> BuiltinResult<()> {
2335    let removed = manifest.arrays.remove(array_name);
2336    if removed.is_none() {
2337        return Err(data_error(format!(
2338            "DataTransaction.delete_array: array '{array_name}' not found"
2339        )));
2340    }
2341    let array_dir = root.join("arrays").join(array_name);
2342    if runmat_filesystem::metadata_async(&array_dir).await.is_ok() {
2343        runmat_filesystem::remove_dir_all_async(&array_dir)
2344            .await
2345            .map_err(|err| {
2346                data_error(format!(
2347                    "DataTransaction.delete_array: failed to remove '{}': {err}",
2348                    array_dir.display()
2349                ))
2350            })?;
2351    }
2352    Ok(())
2353}
2354
2355fn parse_array_meta(array_name: &str, meta: &Value) -> BuiltinResult<DataArrayMeta> {
2356    let Value::Struct(meta_struct) = meta else {
2357        return Err(data_error(
2358            "DataTransaction.create_array: meta must be a struct",
2359        ));
2360    };
2361    let dtype = meta_struct
2362        .fields
2363        .get("dtype")
2364        .map(|v| parse_string(v, "DataTransaction.create_array dtype"))
2365        .transpose()?
2366        .unwrap_or_else(|| "f64".to_string());
2367    let shape = meta_struct
2368        .fields
2369        .get("shape")
2370        .map(parse_shape_from_value)
2371        .transpose()?
2372        .unwrap_or_else(|| vec![0, 0]);
2373    let chunk_shape = meta_struct
2374        .fields
2375        .get("chunk")
2376        .map(parse_shape_from_value)
2377        .transpose()?
2378        .unwrap_or_else(|| default_chunk_shape(&shape));
2379    let codec = meta_struct
2380        .fields
2381        .get("codec")
2382        .map(|v| parse_string(v, "DataTransaction.create_array codec"))
2383        .transpose()?
2384        .unwrap_or_else(|| "zstd".to_string());
2385    Ok(DataArrayMeta {
2386        dtype,
2387        shape,
2388        chunk_shape,
2389        order: "column_major".to_string(),
2390        codec,
2391        chunk_index_path: Some(format!("arrays/{array_name}/chunks/index.json")),
2392        data_path: format!("arrays/{array_name}/data.f64.json"),
2393    })
2394}
2395
2396fn default_chunk_shape(shape: &[usize]) -> Vec<usize> {
2397    if shape.is_empty() {
2398        return vec![1024];
2399    }
2400    let mut out = shape.to_vec();
2401    if out.len() == 1 {
2402        out[0] = out[0].clamp(1, 65_536);
2403        return out;
2404    }
2405    out[0] = out[0].clamp(1, 256);
2406    out[1] = out[1].clamp(1, 256);
2407    for dim in out.iter_mut().skip(2) {
2408        *dim = (*dim).clamp(1, 8);
2409    }
2410    out
2411}
2412
2413#[async_recursion::async_recursion(?Send)]
2414async fn copy_dir_recursive(src: &PathBuf, dst: &PathBuf) -> BuiltinResult<()> {
2415    let metadata = runmat_filesystem::metadata_async(src)
2416        .await
2417        .map_err(|err| data_error(format!("failed to stat '{}': {err}", src.display())))?;
2418    if !metadata.is_dir() {
2419        return Err(data_error(format!(
2420            "expected dataset directory at '{}'",
2421            src.display()
2422        )));
2423    }
2424    runmat_filesystem::create_dir_all_async(dst)
2425        .await
2426        .map_err(|err| data_error(format!("failed to create '{}': {err}", dst.display())))?;
2427    for entry in runmat_filesystem::read_dir_async(src)
2428        .await
2429        .map_err(|err| data_error(format!("failed to read '{}': {err}", src.display())))?
2430    {
2431        let entry_src = entry.path().to_path_buf();
2432        let entry_dst = dst.join(entry.file_name());
2433        if entry.is_dir() {
2434            copy_dir_recursive(&entry_src, &entry_dst).await?;
2435            continue;
2436        }
2437        copy_file(&entry_src, &entry_dst).await?;
2438    }
2439    Ok(())
2440}
2441
2442fn parse_shape_from_value(value: &Value) -> BuiltinResult<Vec<usize>> {
2443    match value {
2444        Value::Tensor(t) => {
2445            let mut out = Vec::with_capacity(t.data.len());
2446            for v in &t.data {
2447                if !v.is_finite() || *v < 0.0 {
2448                    return Err(data_error(
2449                        "shape dimensions must be non-negative finite numbers",
2450                    ));
2451                }
2452                out.push(*v as usize);
2453            }
2454            Ok(out)
2455        }
2456        Value::Num(v) => {
2457            if !v.is_finite() || *v < 0.0 {
2458                return Err(data_error(
2459                    "shape dimensions must be non-negative finite numbers",
2460                ));
2461            }
2462            Ok(vec![*v as usize])
2463        }
2464        Value::Int(v) => {
2465            let n = v.to_i64();
2466            if n < 0 {
2467                return Err(data_error("shape dimensions must be non-negative"));
2468            }
2469            Ok(vec![n as usize])
2470        }
2471        _ => Err(data_error("shape must be a numeric vector")),
2472    }
2473}
2474
2475fn scalar_to_f64(value: &Value) -> BuiltinResult<f64> {
2476    match value {
2477        Value::Num(v) => Ok(*v),
2478        Value::Int(v) => Ok(v.to_i64() as f64),
2479        _ => Err(data_error("expected numeric scalar")),
2480    }
2481}
2482
2483async fn write_array_full_async(
2484    dataset_path: &str,
2485    array_name: &str,
2486    slice_spec: Option<&Value>,
2487    value: &Value,
2488) -> BuiltinResult<()> {
2489    let root = dataset_root(dataset_path);
2490    let mut manifest = read_manifest_async(&root).await?;
2491    apply_write_to_manifest_async(&root, &mut manifest, array_name, slice_spec, value).await?;
2492    manifest.updated_at = now_rfc3339();
2493    manifest.txn_sequence = manifest.txn_sequence.saturating_add(1);
2494    write_manifest_async(&root, &manifest).await
2495}
2496
2497async fn apply_write_to_manifest_async(
2498    root: &std::path::Path,
2499    manifest: &mut DataManifest,
2500    array_name: &str,
2501    slice_spec: Option<&Value>,
2502    value: &Value,
2503) -> BuiltinResult<()> {
2504    let meta: DataArrayMeta = manifest
2505        .arrays
2506        .get(array_name)
2507        .cloned()
2508        .ok_or_else(|| data_error(format!("array '{array_name}' not found")))?;
2509
2510    if let Some(slice_spec) = slice_spec {
2511        if apply_slice_write_chunked_async(root, manifest, array_name, &meta, slice_spec, value)
2512            .await?
2513        {
2514            return Ok(());
2515        }
2516    }
2517
2518    let payload = read_array_payload_async(root, &meta).await?;
2519    let mut next_payload = payload.clone();
2520    if let Some(slice_spec) = slice_spec {
2521        next_payload = write_slice_payload(&payload, slice_spec, value)?;
2522    } else {
2523        let (shape, values) = value_to_tensor_shape_values(value)?;
2524        next_payload.shape = shape;
2525        next_payload.values = values;
2526    }
2527
2528    let (payload_path, chunk_index_path) =
2529        write_array_payload_async(root, array_name, &next_payload, &meta.chunk_shape).await?;
2530    if let Some(updated) = manifest.arrays.get_mut(array_name) {
2531        updated.shape = next_payload.shape.clone();
2532        updated.data_path = make_rel_data_path(root, &payload_path)?;
2533        updated.chunk_index_path = Some(make_rel_data_path(root, &chunk_index_path)?);
2534    }
2535    Ok(())
2536}
2537
2538async fn apply_slice_write_chunked_async(
2539    root: &std::path::Path,
2540    manifest: &mut DataManifest,
2541    array_name: &str,
2542    meta: &DataArrayMeta,
2543    slice_spec: &Value,
2544    value: &Value,
2545) -> BuiltinResult<bool> {
2546    let Some(index_rel_path) = &meta.chunk_index_path else {
2547        return Ok(false);
2548    };
2549    let index_path = root.join(index_rel_path);
2550    if runmat_filesystem::metadata_async(&index_path)
2551        .await
2552        .is_err()
2553    {
2554        return Ok(false);
2555    }
2556    let ranges = parse_slice_spec(slice_spec, &meta.shape)?;
2557    let rhs_shape: Vec<usize> = ranges
2558        .iter()
2559        .map(|r| r.end.saturating_sub(r.start))
2560        .collect();
2561    let (actual_rhs_shape, rhs_values) = value_to_tensor_shape_values(value)?;
2562    if actual_rhs_shape != rhs_shape {
2563        return Err(data_error(format!(
2564            "SHAPE_MISMATCH: rhs shape {:?} must match target slice shape {:?}",
2565            actual_rhs_shape, rhs_shape
2566        )));
2567    }
2568
2569    let index_bytes = runmat_filesystem::read_async(&index_path)
2570        .await
2571        .map_err(|err| {
2572            data_error(format!(
2573                "failed to read chunk index '{}': {err}",
2574                index_path.display()
2575            ))
2576        })?;
2577    let mut chunk_index: DataChunkIndex = serde_json::from_slice(&index_bytes).map_err(|err| {
2578        data_error(format!(
2579            "failed to parse chunk index '{}': {err}",
2580            index_path.display()
2581        ))
2582    })?;
2583
2584    let mut pos_by_key = HashMap::new();
2585    for (idx, entry) in chunk_index.chunks.iter().enumerate() {
2586        pos_by_key.insert(entry.key.clone(), idx);
2587    }
2588    let touched = touched_chunk_coords(&ranges, &meta.chunk_shape, &meta.shape);
2589    let mut upload_batch = Vec::<(DataChunkDescriptor, Vec<u8>)>::new();
2590
2591    for coords in touched {
2592        let key = chunk_key(&coords);
2593        let chunk_start = chunk_start_for_coords(&coords, &meta.chunk_shape);
2594        let chunk_extent = chunk_extent_for_start(&chunk_start, &meta.chunk_shape, &meta.shape);
2595        let intersection = chunk_intersection(&ranges, &chunk_start, &chunk_extent);
2596        if intersection.is_empty() {
2597            continue;
2598        }
2599
2600        let (entry_index, existed, mut entry, mut chunk_payload) = load_or_init_chunk(
2601            root,
2602            array_name,
2603            &key,
2604            &coords,
2605            &chunk_extent,
2606            &pos_by_key,
2607            &chunk_index,
2608        )
2609        .await?;
2610
2611        let mut local = vec![0usize; intersection.len()];
2612        let intersection_shape: Vec<usize> = intersection
2613            .iter()
2614            .map(|r| r.end.saturating_sub(r.start))
2615            .collect();
2616        loop {
2617            let mut global = Vec::with_capacity(intersection.len());
2618            for dim in 0..intersection.len() {
2619                global.push(intersection[dim].start + local[dim]);
2620            }
2621            let rhs_index: Vec<usize> = global
2622                .iter()
2623                .enumerate()
2624                .map(|(dim, g)| g.saturating_sub(ranges[dim].start))
2625                .collect();
2626            let chunk_index_local: Vec<usize> = global
2627                .iter()
2628                .enumerate()
2629                .map(|(dim, g)| g.saturating_sub(chunk_start[dim]))
2630                .collect();
2631            let rhs_linear = linear_index_column_major(&rhs_index, &rhs_shape)?;
2632            let chunk_linear = linear_index_column_major(&chunk_index_local, &chunk_extent)?;
2633            chunk_payload.values[chunk_linear] = rhs_values[rhs_linear];
2634            if !advance_index(&mut local, &intersection_shape) {
2635                break;
2636            }
2637        }
2638
2639        let chunk_bytes = serde_json::to_vec(&chunk_payload)
2640            .map_err(|err| data_error(format!("failed to encode chunk payload: {err}")))?;
2641        let chunk_path = root.join(&entry.data_path);
2642        runmat_filesystem::write_async(&chunk_path, &chunk_bytes)
2643            .await
2644            .map_err(|err| {
2645                data_error(format!(
2646                    "failed to write chunk payload '{}': {err}",
2647                    chunk_path.display()
2648                ))
2649            })?;
2650
2651        entry.coords = coords.clone();
2652        entry.shape = chunk_extent.clone();
2653        entry.bytes_raw = chunk_bytes.len() as u64;
2654        entry.bytes_stored = chunk_bytes.len() as u64;
2655        entry.hash = sha256_hex(&chunk_bytes);
2656        if existed {
2657            chunk_index.chunks[entry_index] = entry.clone();
2658        } else {
2659            chunk_index.chunks.push(entry.clone());
2660            pos_by_key.insert(key.clone(), chunk_index.chunks.len() - 1);
2661        }
2662        upload_batch.push((
2663            DataChunkDescriptor {
2664                key: key.clone(),
2665                object_id: entry.object_id.clone(),
2666                hash: entry.hash.clone(),
2667                bytes_raw: entry.bytes_raw,
2668                bytes_stored: entry.bytes_stored,
2669            },
2670            chunk_bytes,
2671        ));
2672    }
2673
2674    maybe_upload_chunk_batch_async(root, array_name, upload_batch).await?;
2675    tracing::info!(
2676        target: "runmat.data",
2677        dataset = %root.display(),
2678        array = array_name,
2679        touched_chunks = chunk_index.chunks.len(),
2680        "chunked slice write committed"
2681    );
2682    let index_write = serde_json::to_vec(&chunk_index)
2683        .map_err(|err| data_error(format!("failed to encode chunk index json: {err}")))?;
2684    runmat_filesystem::write_async(&index_path, &index_write)
2685        .await
2686        .map_err(|err| {
2687            data_error(format!(
2688                "failed to write chunk index '{}': {err}",
2689                index_path.display()
2690            ))
2691        })?;
2692
2693    if let Some(updated) = manifest.arrays.get_mut(array_name) {
2694        updated.shape = meta.shape.clone();
2695        updated.chunk_index_path = Some(index_rel_path.clone());
2696    }
2697    Ok(true)
2698}
2699
2700fn value_to_tensor_shape_values(value: &Value) -> BuiltinResult<(Vec<usize>, Vec<f64>)> {
2701    match value {
2702        Value::Tensor(t) => Ok((t.shape.clone(), t.data.clone())),
2703        Value::Num(n) => Ok((vec![1, 1], vec![*n])),
2704        Value::Int(i) => Ok((vec![1, 1], vec![i.to_i64() as f64])),
2705        _ => Err(data_error(
2706            "DataArray.write supports tensor or numeric scalar values",
2707        )),
2708    }
2709}
2710
2711#[derive(Clone, Copy, Debug)]
2712struct DimRange {
2713    start: usize,
2714    end: usize,
2715}
2716
2717fn read_slice_payload(
2718    payload: &DataArrayPayload,
2719    slice_spec: &Value,
2720) -> BuiltinResult<DataArrayPayload> {
2721    let ranges = parse_slice_spec(slice_spec, &payload.shape)?;
2722    let out_shape: Vec<usize> = ranges
2723        .iter()
2724        .map(|r| r.end.saturating_sub(r.start))
2725        .collect();
2726    let mut out_values = Vec::new();
2727    let mut out_index = vec![0usize; out_shape.len()];
2728    loop {
2729        let source_index: Vec<usize> = out_index
2730            .iter()
2731            .enumerate()
2732            .map(|(dim, idx)| ranges[dim].start + *idx)
2733            .collect();
2734        let linear = linear_index_column_major(&source_index, &payload.shape)?;
2735        out_values.push(payload.values[linear]);
2736
2737        if !advance_index(&mut out_index, &out_shape) {
2738            break;
2739        }
2740    }
2741    Ok(DataArrayPayload {
2742        dtype: payload.dtype.clone(),
2743        shape: out_shape,
2744        values: out_values,
2745    })
2746}
2747
2748fn write_slice_payload(
2749    payload: &DataArrayPayload,
2750    slice_spec: &Value,
2751    rhs: &Value,
2752) -> BuiltinResult<DataArrayPayload> {
2753    let ranges = parse_slice_spec(slice_spec, &payload.shape)?;
2754    let target_shape: Vec<usize> = ranges
2755        .iter()
2756        .map(|r| r.end.saturating_sub(r.start))
2757        .collect();
2758    let (rhs_shape, rhs_values) = value_to_tensor_shape_values(rhs)?;
2759    if rhs_shape != target_shape {
2760        return Err(data_error(format!(
2761            "SHAPE_MISMATCH: rhs shape {:?} must match target slice shape {:?}",
2762            rhs_shape, target_shape
2763        )));
2764    }
2765
2766    let mut next = payload.values.clone();
2767    let mut rhs_index = vec![0usize; target_shape.len()];
2768    let mut rhs_linear = 0usize;
2769    loop {
2770        let target_index: Vec<usize> = rhs_index
2771            .iter()
2772            .enumerate()
2773            .map(|(dim, idx)| ranges[dim].start + *idx)
2774            .collect();
2775        let target_linear = linear_index_column_major(&target_index, &payload.shape)?;
2776        next[target_linear] = rhs_values[rhs_linear];
2777        rhs_linear += 1;
2778
2779        if !advance_index(&mut rhs_index, &target_shape) {
2780            break;
2781        }
2782    }
2783
2784    Ok(DataArrayPayload {
2785        dtype: payload.dtype.clone(),
2786        shape: payload.shape.clone(),
2787        values: next,
2788    })
2789}
2790
2791fn parse_slice_spec(slice_spec: &Value, shape: &[usize]) -> BuiltinResult<Vec<DimRange>> {
2792    match slice_spec {
2793        Value::Cell(cell) => {
2794            if cell.data.is_empty() {
2795                return Err(data_error("INVALID_SLICE: empty slice specification"));
2796            }
2797            let mut ranges = Vec::with_capacity(shape.len());
2798            for (dim, extent) in shape.iter().enumerate() {
2799                if let Some(item) = cell.data.get(dim).map(|v| &**v) {
2800                    ranges.push(parse_dim_range(item, *extent)?);
2801                } else {
2802                    ranges.push(DimRange {
2803                        start: 0,
2804                        end: *extent,
2805                    });
2806                }
2807            }
2808            Ok(ranges)
2809        }
2810        Value::String(s) if s == ":" => Ok(shape
2811            .iter()
2812            .map(|extent| DimRange {
2813                start: 0,
2814                end: *extent,
2815            })
2816            .collect()),
2817        _ => Err(data_error(
2818            "INVALID_SLICE: slice must be a cell spec like {1:10, :} or ':'",
2819        )),
2820    }
2821}
2822
2823fn parse_dim_range(value: &Value, extent: usize) -> BuiltinResult<DimRange> {
2824    if extent == 0 {
2825        return Ok(DimRange { start: 0, end: 0 });
2826    }
2827    match value {
2828        Value::String(s) if s == ":" => Ok(DimRange {
2829            start: 0,
2830            end: extent,
2831        }),
2832        Value::Num(n) => {
2833            let idx = (*n as isize) - 1;
2834            if idx < 0 || idx as usize >= extent {
2835                return Err(data_error("INVALID_SLICE: index out of bounds"));
2836            }
2837            Ok(DimRange {
2838                start: idx as usize,
2839                end: idx as usize + 1,
2840            })
2841        }
2842        Value::Int(i) => {
2843            let idx = i.to_i64() - 1;
2844            if idx < 0 || idx as usize >= extent {
2845                return Err(data_error("INVALID_SLICE: index out of bounds"));
2846            }
2847            Ok(DimRange {
2848                start: idx as usize,
2849                end: idx as usize + 1,
2850            })
2851        }
2852        Value::Tensor(t) if t.data.len() == 2 => {
2853            let start = (t.data[0] as isize) - 1;
2854            let end_inclusive = (t.data[1] as isize) - 1;
2855            if start < 0 || end_inclusive < start || end_inclusive as usize >= extent {
2856                return Err(data_error("INVALID_SLICE: range out of bounds"));
2857            }
2858            Ok(DimRange {
2859                start: start as usize,
2860                end: end_inclusive as usize + 1,
2861            })
2862        }
2863        _ => Err(data_error(
2864            "INVALID_SLICE: dimension must be ':', scalar index, or [start end] range",
2865        )),
2866    }
2867}
2868
2869fn linear_index_column_major(index: &[usize], shape: &[usize]) -> BuiltinResult<usize> {
2870    if index.len() != shape.len() {
2871        return Err(data_error("INVALID_SLICE: rank mismatch"));
2872    }
2873    let mut stride = 1usize;
2874    let mut linear = 0usize;
2875    for (idx, extent) in index.iter().zip(shape.iter()) {
2876        if *idx >= *extent {
2877            return Err(data_error("INVALID_SLICE: index out of bounds"));
2878        }
2879        linear += idx * stride;
2880        stride = stride.saturating_mul(*extent);
2881    }
2882    Ok(linear)
2883}
2884
2885fn advance_index(index: &mut [usize], shape: &[usize]) -> bool {
2886    if shape.is_empty() {
2887        return false;
2888    }
2889    for dim in 0..shape.len() {
2890        index[dim] += 1;
2891        if index[dim] < shape[dim] {
2892            return true;
2893        }
2894        index[dim] = 0;
2895    }
2896    false
2897}
2898
2899fn chunk_key(coords: &[usize]) -> String {
2900    coords
2901        .iter()
2902        .map(|v| v.to_string())
2903        .collect::<Vec<_>>()
2904        .join(".")
2905}
2906
2907fn chunk_start_for_coords(coords: &[usize], chunk_shape: &[usize]) -> Vec<usize> {
2908    coords
2909        .iter()
2910        .enumerate()
2911        .map(|(dim, coord)| coord * chunk_shape.get(dim).copied().unwrap_or(1).max(1))
2912        .collect()
2913}
2914
2915fn chunk_extent_for_start(start: &[usize], chunk_shape: &[usize], shape: &[usize]) -> Vec<usize> {
2916    start
2917        .iter()
2918        .enumerate()
2919        .map(|(dim, start)| {
2920            let chunk = chunk_shape.get(dim).copied().unwrap_or(1).max(1);
2921            let end = (*start + chunk).min(shape[dim]);
2922            end.saturating_sub(*start)
2923        })
2924        .collect()
2925}
2926
2927fn chunk_intersection(
2928    ranges: &[DimRange],
2929    chunk_start: &[usize],
2930    chunk_extent: &[usize],
2931) -> Vec<DimRange> {
2932    let mut out = Vec::with_capacity(ranges.len());
2933    for dim in 0..ranges.len() {
2934        let c_start = chunk_start[dim];
2935        let c_end = c_start + chunk_extent[dim];
2936        let start = ranges[dim].start.max(c_start);
2937        let end = ranges[dim].end.min(c_end);
2938        if start >= end {
2939            return Vec::new();
2940        }
2941        out.push(DimRange { start, end });
2942    }
2943    out
2944}
2945
2946fn touched_chunk_coords(
2947    ranges: &[DimRange],
2948    chunk_shape: &[usize],
2949    shape: &[usize],
2950) -> Vec<Vec<usize>> {
2951    let mut span = Vec::with_capacity(ranges.len());
2952    let mut begin = Vec::with_capacity(ranges.len());
2953    for dim in 0..ranges.len() {
2954        if shape[dim] == 0 {
2955            return Vec::new();
2956        }
2957        let chunk = chunk_shape.get(dim).copied().unwrap_or(1).max(1);
2958        let first = ranges[dim].start / chunk;
2959        let last = (ranges[dim].end.saturating_sub(1)) / chunk;
2960        begin.push(first);
2961        span.push(last.saturating_sub(first) + 1);
2962    }
2963    let mut local = vec![0usize; span.len()];
2964    let mut out = Vec::new();
2965    loop {
2966        out.push(
2967            local
2968                .iter()
2969                .enumerate()
2970                .map(|(dim, v)| begin[dim] + *v)
2971                .collect::<Vec<_>>(),
2972        );
2973        if !advance_index(&mut local, &span) {
2974            break;
2975        }
2976    }
2977    out
2978}
2979
2980async fn maybe_upload_chunk_batch_async(
2981    root: &std::path::Path,
2982    array_name: &str,
2983    batch: Vec<(DataChunkDescriptor, Vec<u8>)>,
2984) -> BuiltinResult<()> {
2985    if batch.is_empty() {
2986        return Ok(());
2987    }
2988    let request = DataChunkUploadRequest {
2989        dataset_path: root.to_string_lossy().to_string(),
2990        array: array_name.to_string(),
2991        chunks: batch.iter().map(|(d, _)| d.clone()).collect(),
2992    };
2993    let targets = match runmat_filesystem::data_chunk_upload_targets_async(&request).await {
2994        Ok(targets) => targets,
2995        Err(err) if err.kind() == std::io::ErrorKind::Unsupported => return Ok(()),
2996        Err(err) => {
2997            return Err(data_error(format!(
2998                "failed to request data chunk upload targets: {err}"
2999            )))
3000        }
3001    };
3002    for (descriptor, bytes) in batch {
3003        let target = targets
3004            .iter()
3005            .find(|t| t.key == descriptor.key)
3006            .ok_or_else(|| {
3007                data_error(format!(
3008                    "missing upload target for chunk '{}'",
3009                    descriptor.key
3010                ))
3011            })?;
3012        runmat_filesystem::data_upload_chunk_async(target, &bytes)
3013            .await
3014            .map_err(|err| {
3015                data_error(format!(
3016                    "failed to upload chunk '{}': {err}",
3017                    descriptor.key
3018                ))
3019            })?;
3020        tracing::info!(
3021            target: "runmat.data",
3022            dataset = %root.display(),
3023            array = array_name,
3024            chunk_key = descriptor.key,
3025            bytes = bytes.len(),
3026            "chunk upload completed"
3027        );
3028    }
3029    Ok(())
3030}
3031
3032fn chunk_rel_path(array_name: &str, object_id: &str) -> String {
3033    format!("arrays/{array_name}/chunks/{object_id}.json")
3034}
3035
3036async fn load_or_init_chunk(
3037    root: &std::path::Path,
3038    array_name: &str,
3039    key: &str,
3040    coords: &[usize],
3041    chunk_extent: &[usize],
3042    pos_by_key: &HashMap<String, usize>,
3043    chunk_index: &DataChunkIndex,
3044) -> BuiltinResult<(usize, bool, DataChunkIndexEntry, DataArrayPayload)> {
3045    if let Some(index) = pos_by_key.get(key).copied() {
3046        let entry = chunk_index
3047            .chunks
3048            .get(index)
3049            .cloned()
3050            .ok_or_else(|| data_error(format!("chunk index missing key '{key}'")))?;
3051        let bytes = runmat_filesystem::read_async(root.join(&entry.data_path))
3052            .await
3053            .map_err(|err| {
3054                data_error(format!(
3055                    "failed to read chunk payload '{}': {err}",
3056                    entry.data_path
3057                ))
3058            })?;
3059        let payload: DataArrayPayload = serde_json::from_slice(&bytes).map_err(|err| {
3060            data_error(format!(
3061                "failed to parse chunk payload '{}': {err}",
3062                entry.data_path
3063            ))
3064        })?;
3065        return Ok((index, true, entry, payload));
3066    }
3067
3068    let object_id = format!("obj_{}", key.replace('.', "_"));
3069    let entry = DataChunkIndexEntry {
3070        key: key.to_string(),
3071        object_id: object_id.clone(),
3072        hash: String::new(),
3073        bytes_raw: 0,
3074        bytes_stored: 0,
3075        coords: coords.to_vec(),
3076        shape: chunk_extent.to_vec(),
3077        data_path: chunk_rel_path(array_name, &object_id),
3078    };
3079    let payload = DataArrayPayload {
3080        dtype: "f64".to_string(),
3081        shape: chunk_extent.to_vec(),
3082        values: vec![0.0; chunk_extent.iter().copied().product()],
3083    };
3084    Ok((chunk_index.chunks.len(), false, entry, payload))
3085}
3086
3087fn attrs_to_struct(attrs: &BTreeMap<String, serde_json::Value>) -> Value {
3088    let mut out = StructValue::new();
3089    for (k, v) in attrs {
3090        out.fields.insert(k.clone(), json_to_value(v));
3091    }
3092    Value::Struct(out)
3093}
3094
3095fn value_to_json(value: &Value) -> serde_json::Value {
3096    match value {
3097        Value::String(s) => serde_json::Value::String(s.clone()),
3098        Value::CharArray(ca) => serde_json::Value::String(ca.to_string()),
3099        Value::Num(n) => serde_json::json!(n),
3100        Value::Int(i) => serde_json::json!(i.to_i64()),
3101        Value::Bool(b) => serde_json::json!(b),
3102        _ => serde_json::Value::String(format!("{value:?}")),
3103    }
3104}
3105
3106fn json_to_value(value: &serde_json::Value) -> Value {
3107    match value {
3108        serde_json::Value::Bool(b) => Value::Bool(*b),
3109        serde_json::Value::Number(n) => Value::Num(n.as_f64().unwrap_or_default()),
3110        serde_json::Value::String(s) => Value::String(s.clone()),
3111        serde_json::Value::Array(arr) => {
3112            let vals = arr.iter().map(json_to_value).collect::<Vec<_>>();
3113            crate::make_cell(vals.clone(), 1, vals.len())
3114                .unwrap_or_else(|_| Value::String("<invalid-array>".to_string()))
3115        }
3116        serde_json::Value::Object(map) => {
3117            let mut s = StructValue::new();
3118            for (k, v) in map {
3119                s.fields.insert(k.clone(), json_to_value(v));
3120            }
3121            Value::Struct(s)
3122        }
3123        serde_json::Value::Null => Value::String("".to_string()),
3124    }
3125}
3126
3127#[cfg(test)]
3128mod tests {
3129    use super::*;
3130    use crate::dispatcher::call_builtin;
3131    use async_trait::async_trait;
3132    use axum::extract::{Query, State};
3133    use axum::http::{HeaderMap, StatusCode};
3134    use axum::routing::{post, put};
3135    use axum::{Json, Router};
3136    use runmat_builtins::CellArray;
3137    use runmat_filesystem::data_contract::{
3138        DataChunkUploadRequest, DataChunkUploadTarget, DataManifestDescriptor, DataManifestRequest,
3139    };
3140    use runmat_filesystem::{
3141        DirEntry, FileHandle, FsMetadata, FsProvider, NativeFsProvider, OpenFlags,
3142    };
3143    use serde::Deserialize;
3144    use std::path::Path;
3145    use std::sync::{Arc, Mutex, MutexGuard, OnceLock};
3146    use tokio::runtime::Runtime;
3147    use tokio::sync::oneshot;
3148
3149    fn serial_test_guard() -> MutexGuard<'static, ()> {
3150        static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
3151        LOCK.get_or_init(|| Mutex::new(()))
3152            .lock()
3153            .expect("data test serial lock poisoned")
3154    }
3155
3156    #[test]
3157    fn io_data_descriptors_cover_constructor_and_transaction_surface() {
3158        let data_labels: Vec<&str> = DATA_CREATE_DESCRIPTOR
3159            .signatures
3160            .iter()
3161            .map(|sig| sig.label)
3162            .collect();
3163        assert!(data_labels.contains(&"ds = data.create(path, schema, Name, Value, ...)"));
3164
3165        let read_labels: Vec<&str> = DATAARRAY_READ_DESCRIPTOR
3166            .signatures
3167            .iter()
3168            .map(|sig| sig.label)
3169            .collect();
3170        assert!(read_labels.contains(&"X = DataArray.read(arr, sliceSpec)"));
3171
3172        let write_labels: Vec<&str> = DATAARRAY_WRITE_DESCRIPTOR
3173            .signatures
3174            .iter()
3175            .map(|sig| sig.label)
3176            .collect();
3177        assert!(write_labels.contains(&"tf = DataArray.write(arr, values)"));
3178        assert!(write_labels.contains(&"tf = DataArray.write(arr, sliceSpec, values)"));
3179
3180        let tx_labels: Vec<&str> = DATATX_COMMIT_DESCRIPTOR
3181            .signatures
3182            .iter()
3183            .map(|sig| sig.label)
3184            .collect();
3185        assert!(tx_labels.contains(&"tf = DataTransaction.commit(tx, Name, Value, ...)"));
3186    }
3187
3188    #[derive(Default)]
3189    struct CountingDataUploadProvider {
3190        inner: NativeFsProvider,
3191        uploaded_keys: Arc<Mutex<Vec<String>>>,
3192    }
3193
3194    struct HttpDataUploadProvider {
3195        inner: NativeFsProvider,
3196        base_url: String,
3197        client: reqwest::blocking::Client,
3198    }
3199
3200    impl HttpDataUploadProvider {
3201        fn new(base_url: String) -> Self {
3202            Self {
3203                inner: NativeFsProvider,
3204                base_url,
3205                client: reqwest::blocking::Client::new(),
3206            }
3207        }
3208    }
3209
3210    #[async_trait(?Send)]
3211    impl FsProvider for HttpDataUploadProvider {
3212        fn open(&self, path: &Path, flags: &OpenFlags) -> std::io::Result<Box<dyn FileHandle>> {
3213            self.inner.open(path, flags)
3214        }
3215
3216        async fn read(&self, path: &Path) -> std::io::Result<Vec<u8>> {
3217            self.inner.read(path).await
3218        }
3219
3220        async fn write(&self, path: &Path, data: &[u8]) -> std::io::Result<()> {
3221            self.inner.write(path, data).await
3222        }
3223
3224        async fn remove_file(&self, path: &Path) -> std::io::Result<()> {
3225            self.inner.remove_file(path).await
3226        }
3227
3228        async fn metadata(&self, path: &Path) -> std::io::Result<FsMetadata> {
3229            self.inner.metadata(path).await
3230        }
3231
3232        async fn symlink_metadata(&self, path: &Path) -> std::io::Result<FsMetadata> {
3233            self.inner.symlink_metadata(path).await
3234        }
3235
3236        async fn read_dir(&self, path: &Path) -> std::io::Result<Vec<DirEntry>> {
3237            self.inner.read_dir(path).await
3238        }
3239
3240        async fn canonicalize(&self, path: &Path) -> std::io::Result<std::path::PathBuf> {
3241            self.inner.canonicalize(path).await
3242        }
3243
3244        async fn create_dir(&self, path: &Path) -> std::io::Result<()> {
3245            self.inner.create_dir(path).await
3246        }
3247
3248        async fn create_dir_all(&self, path: &Path) -> std::io::Result<()> {
3249            self.inner.create_dir_all(path).await
3250        }
3251
3252        async fn remove_dir(&self, path: &Path) -> std::io::Result<()> {
3253            self.inner.remove_dir(path).await
3254        }
3255
3256        async fn remove_dir_all(&self, path: &Path) -> std::io::Result<()> {
3257            self.inner.remove_dir_all(path).await
3258        }
3259
3260        async fn rename(&self, from: &Path, to: &Path) -> std::io::Result<()> {
3261            self.inner.rename(from, to).await
3262        }
3263
3264        async fn set_readonly(&self, path: &Path, readonly: bool) -> std::io::Result<()> {
3265            self.inner.set_readonly(path, readonly).await
3266        }
3267
3268        async fn data_manifest_descriptor(
3269            &self,
3270            request: &DataManifestRequest,
3271        ) -> std::io::Result<DataManifestDescriptor> {
3272            self.inner.data_manifest_descriptor(request).await
3273        }
3274
3275        async fn data_chunk_upload_targets(
3276            &self,
3277            request: &DataChunkUploadRequest,
3278        ) -> std::io::Result<Vec<DataChunkUploadTarget>> {
3279            #[derive(Deserialize)]
3280            struct UploadTargetsResponse {
3281                targets: Vec<DataChunkUploadTarget>,
3282            }
3283            let url = format!("{}/data/chunks/upload-targets", self.base_url);
3284            let response = self
3285                .client
3286                .post(url)
3287                .json(request)
3288                .send()
3289                .map_err(|err| std::io::Error::other(err.to_string()))?;
3290            if !response.status().is_success() {
3291                return Err(std::io::Error::other(format!(
3292                    "upload targets request failed: {}",
3293                    response.status()
3294                )));
3295            }
3296            let parsed: UploadTargetsResponse = response
3297                .json()
3298                .map_err(|err| std::io::Error::other(err.to_string()))?;
3299            Ok(parsed.targets)
3300        }
3301
3302        async fn data_upload_chunk(
3303            &self,
3304            target: &DataChunkUploadTarget,
3305            data: &[u8],
3306        ) -> std::io::Result<()> {
3307            let upload_url = if let Some(key) = target.upload_url.strip_prefix("upload://") {
3308                format!("{}/upload?key={}", self.base_url, key)
3309            } else {
3310                target.upload_url.clone()
3311            };
3312            let method = reqwest::Method::from_bytes(target.method.as_bytes())
3313                .map_err(|err| std::io::Error::other(err.to_string()))?;
3314            let mut request = self.client.request(method, &upload_url);
3315            for (k, v) in &target.headers {
3316                request = request.header(k, v);
3317            }
3318            let response = request
3319                .body(data.to_vec())
3320                .send()
3321                .map_err(|err| std::io::Error::other(err.to_string()))?;
3322            if !response.status().is_success() {
3323                return Err(std::io::Error::other(format!(
3324                    "chunk upload failed: {}",
3325                    response.status()
3326                )));
3327            }
3328            Ok(())
3329        }
3330    }
3331
3332    #[derive(Clone, Default)]
3333    struct UploadHarness {
3334        uploads: Arc<Mutex<Vec<String>>>,
3335    }
3336
3337    #[derive(Deserialize)]
3338    struct UploadChunkQuery {
3339        key: String,
3340    }
3341
3342    async fn upload_targets_handler(
3343        Json(req): Json<DataChunkUploadRequest>,
3344    ) -> Result<Json<serde_json::Value>, StatusCode> {
3345        let targets = req
3346            .chunks
3347            .iter()
3348            .map(|chunk| {
3349                serde_json::json!({
3350                    "key": chunk.key,
3351                    "method": "PUT",
3352                    "upload_url": format!("upload://{}", chunk.key),
3353                    "headers": {
3354                        "x-runmat-hash": chunk.hash,
3355                    }
3356                })
3357            })
3358            .collect::<Vec<_>>();
3359        Ok(Json(serde_json::json!({ "targets": targets })))
3360    }
3361
3362    async fn upload_handler(
3363        State(harness): State<UploadHarness>,
3364        Query(query): Query<UploadChunkQuery>,
3365        headers: HeaderMap,
3366        body: axum::body::Bytes,
3367    ) -> Result<(), StatusCode> {
3368        if body.is_empty() {
3369            return Err(StatusCode::BAD_REQUEST);
3370        }
3371        if headers.get("x-runmat-hash").is_none() {
3372            return Err(StatusCode::BAD_REQUEST);
3373        }
3374        let mut guard = harness.uploads.lock().expect("uploads lock poisoned");
3375        guard.push(query.key);
3376        Ok(())
3377    }
3378
3379    fn spawn_upload_server() -> (
3380        String,
3381        Arc<Mutex<Vec<String>>>,
3382        Runtime,
3383        oneshot::Sender<()>,
3384    ) {
3385        let harness = UploadHarness::default();
3386        let uploads = Arc::clone(&harness.uploads);
3387        let runtime = Runtime::new().expect("tokio runtime");
3388        let (addr, shutdown_tx) = runtime.block_on(async move {
3389            let listener = tokio::net::TcpListener::bind((std::net::Ipv4Addr::LOCALHOST, 0))
3390                .await
3391                .expect("bind upload server");
3392            let addr = listener.local_addr().expect("local addr");
3393            let app = Router::new()
3394                .route("/data/chunks/upload-targets", post(upload_targets_handler))
3395                .route("/upload", put(upload_handler))
3396                .with_state(harness);
3397            let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
3398            let server = axum::serve(listener, app).with_graceful_shutdown(async {
3399                let _ = shutdown_rx.await;
3400            });
3401            tokio::spawn(async move {
3402                let _ = server.await;
3403            });
3404            (addr, shutdown_tx)
3405        });
3406        (format!("http://{}", addr), uploads, runtime, shutdown_tx)
3407    }
3408
3409    impl CountingDataUploadProvider {
3410        fn uploaded_keys(&self) -> Arc<Mutex<Vec<String>>> {
3411            Arc::clone(&self.uploaded_keys)
3412        }
3413    }
3414
3415    #[async_trait(?Send)]
3416    impl FsProvider for CountingDataUploadProvider {
3417        fn open(&self, path: &Path, flags: &OpenFlags) -> std::io::Result<Box<dyn FileHandle>> {
3418            self.inner.open(path, flags)
3419        }
3420
3421        async fn read(&self, path: &Path) -> std::io::Result<Vec<u8>> {
3422            self.inner.read(path).await
3423        }
3424
3425        async fn write(&self, path: &Path, data: &[u8]) -> std::io::Result<()> {
3426            self.inner.write(path, data).await
3427        }
3428
3429        async fn remove_file(&self, path: &Path) -> std::io::Result<()> {
3430            self.inner.remove_file(path).await
3431        }
3432
3433        async fn metadata(&self, path: &Path) -> std::io::Result<FsMetadata> {
3434            self.inner.metadata(path).await
3435        }
3436
3437        async fn symlink_metadata(&self, path: &Path) -> std::io::Result<FsMetadata> {
3438            self.inner.symlink_metadata(path).await
3439        }
3440
3441        async fn read_dir(&self, path: &Path) -> std::io::Result<Vec<DirEntry>> {
3442            self.inner.read_dir(path).await
3443        }
3444
3445        async fn canonicalize(&self, path: &Path) -> std::io::Result<std::path::PathBuf> {
3446            self.inner.canonicalize(path).await
3447        }
3448
3449        async fn create_dir(&self, path: &Path) -> std::io::Result<()> {
3450            self.inner.create_dir(path).await
3451        }
3452
3453        async fn create_dir_all(&self, path: &Path) -> std::io::Result<()> {
3454            self.inner.create_dir_all(path).await
3455        }
3456
3457        async fn remove_dir(&self, path: &Path) -> std::io::Result<()> {
3458            self.inner.remove_dir(path).await
3459        }
3460
3461        async fn remove_dir_all(&self, path: &Path) -> std::io::Result<()> {
3462            self.inner.remove_dir_all(path).await
3463        }
3464
3465        async fn rename(&self, from: &Path, to: &Path) -> std::io::Result<()> {
3466            self.inner.rename(from, to).await
3467        }
3468
3469        async fn set_readonly(&self, path: &Path, readonly: bool) -> std::io::Result<()> {
3470            self.inner.set_readonly(path, readonly).await
3471        }
3472
3473        async fn data_manifest_descriptor(
3474            &self,
3475            request: &DataManifestRequest,
3476        ) -> std::io::Result<DataManifestDescriptor> {
3477            self.inner.data_manifest_descriptor(request).await
3478        }
3479
3480        async fn data_chunk_upload_targets(
3481            &self,
3482            request: &DataChunkUploadRequest,
3483        ) -> std::io::Result<Vec<DataChunkUploadTarget>> {
3484            Ok(request
3485                .chunks
3486                .iter()
3487                .map(|chunk| DataChunkUploadTarget {
3488                    key: chunk.key.clone(),
3489                    method: "PUT".to_string(),
3490                    upload_url: format!("count://{}", chunk.object_id),
3491                    headers: std::collections::HashMap::new(),
3492                })
3493                .collect())
3494        }
3495
3496        async fn data_upload_chunk(
3497            &self,
3498            target: &DataChunkUploadTarget,
3499            _data: &[u8],
3500        ) -> std::io::Result<()> {
3501            let mut guard = match self.uploaded_keys.lock() {
3502                Ok(guard) => guard,
3503                Err(poisoned) => poisoned.into_inner(),
3504            };
3505            guard.push(target.key.clone());
3506            Ok(())
3507        }
3508    }
3509
3510    #[test]
3511    fn create_open_write_read_dataset() {
3512        let _serial = serial_test_guard();
3513        let dir = tempfile::tempdir().expect("tempdir");
3514        let path = dir.path().join("sample.data").to_string_lossy().to_string();
3515
3516        let mut array_meta = StructValue::new();
3517        array_meta
3518            .fields
3519            .insert("dtype".to_string(), Value::String("f64".to_string()));
3520        array_meta.fields.insert(
3521            "shape".to_string(),
3522            Value::Tensor(Tensor::new(vec![2.0, 2.0], vec![1, 2]).expect("shape tensor")),
3523        );
3524        let mut arrays = StructValue::new();
3525        arrays
3526            .fields
3527            .insert("temperature".to_string(), Value::Struct(array_meta));
3528        let mut schema = StructValue::new();
3529        schema
3530            .fields
3531            .insert("arrays".to_string(), Value::Struct(arrays));
3532
3533        let ds = call_builtin(
3534            "data.create",
3535            &[
3536                Value::String(path.clone()),
3537                Value::Struct(schema),
3538                Value::Cell(runmat_builtins::CellArray::new(vec![], 1, 0).expect("cell")),
3539            ],
3540        )
3541        .expect("create dataset");
3542
3543        let arr = call_builtin(
3544            "Dataset.array",
3545            &[ds, Value::String("temperature".to_string())],
3546        )
3547        .expect("dataset array");
3548        let write_tensor = Tensor::new(vec![1.0, 2.0, 3.0, 4.0], vec![2, 2]).expect("write tensor");
3549        call_builtin(
3550            "DataArray.write",
3551            &[arr.clone(), Value::Tensor(write_tensor)],
3552        )
3553        .expect("write array");
3554
3555        let read_back = call_builtin("DataArray.read", &[arr]).expect("read array");
3556        let Value::Tensor(t) = read_back else {
3557            panic!("expected tensor");
3558        };
3559        assert_eq!(t.shape, vec![2, 2]);
3560        assert_eq!(t.data, vec![1.0, 2.0, 3.0, 4.0]);
3561    }
3562
3563    #[test]
3564    fn write_and_read_slice_payload() {
3565        let _serial = serial_test_guard();
3566        let dir = tempfile::tempdir().expect("tempdir");
3567        let path = dir.path().join("slice.data").to_string_lossy().to_string();
3568
3569        let mut array_meta = StructValue::new();
3570        array_meta
3571            .fields
3572            .insert("dtype".to_string(), Value::String("f64".to_string()));
3573        array_meta.fields.insert(
3574            "shape".to_string(),
3575            Value::Tensor(Tensor::new(vec![3.0, 3.0], vec![1, 2]).expect("shape tensor")),
3576        );
3577        let mut arrays = StructValue::new();
3578        arrays
3579            .fields
3580            .insert("temperature".to_string(), Value::Struct(array_meta));
3581        let mut schema = StructValue::new();
3582        schema
3583            .fields
3584            .insert("arrays".to_string(), Value::Struct(arrays));
3585
3586        let ds = call_builtin(
3587            "data.create",
3588            &[
3589                Value::String(path.clone()),
3590                Value::Struct(schema),
3591                Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
3592            ],
3593        )
3594        .expect("create dataset");
3595        let arr = call_builtin(
3596            "Dataset.array",
3597            &[ds, Value::String("temperature".to_string())],
3598        )
3599        .expect("dataset array");
3600
3601        let slice = Value::Cell(
3602            CellArray::new(
3603                vec![
3604                    Value::Tensor(Tensor::new(vec![1.0, 2.0], vec![1, 2]).expect("range")),
3605                    Value::String(":".to_string()),
3606                ],
3607                1,
3608                2,
3609            )
3610            .expect("slice cell"),
3611        );
3612        let rhs = Value::Tensor(
3613            Tensor::new(vec![10.0, 11.0, 12.0, 13.0, 14.0, 15.0], vec![2, 3]).expect("rhs"),
3614        );
3615        call_builtin("DataArray.write", &[arr.clone(), slice.clone(), rhs]).expect("slice write");
3616
3617        let read_back = call_builtin("DataArray.read", &[arr.clone(), slice]).expect("slice read");
3618        let Value::Tensor(t) = read_back else {
3619            panic!("expected tensor");
3620        };
3621        assert_eq!(t.shape, vec![2, 3]);
3622        assert_eq!(t.data, vec![10.0, 11.0, 12.0, 13.0, 14.0, 15.0]);
3623    }
3624
3625    #[test]
3626    fn slice_write_updates_only_touched_chunks() {
3627        let _serial = serial_test_guard();
3628        let dir = tempfile::tempdir().expect("tempdir");
3629        let path = dir
3630            .path()
3631            .join("chunked.data")
3632            .to_string_lossy()
3633            .to_string();
3634
3635        let mut array_meta = StructValue::new();
3636        array_meta
3637            .fields
3638            .insert("dtype".to_string(), Value::String("f64".to_string()));
3639        array_meta.fields.insert(
3640            "shape".to_string(),
3641            Value::Tensor(Tensor::new(vec![4.0, 4.0], vec![1, 2]).expect("shape tensor")),
3642        );
3643        array_meta.fields.insert(
3644            "chunk".to_string(),
3645            Value::Tensor(Tensor::new(vec![2.0, 2.0], vec![1, 2]).expect("chunk tensor")),
3646        );
3647        let mut arrays = StructValue::new();
3648        arrays
3649            .fields
3650            .insert("temperature".to_string(), Value::Struct(array_meta));
3651        let mut schema = StructValue::new();
3652        schema
3653            .fields
3654            .insert("arrays".to_string(), Value::Struct(arrays));
3655
3656        let ds = call_builtin(
3657            "data.create",
3658            &[
3659                Value::String(path.clone()),
3660                Value::Struct(schema),
3661                Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
3662            ],
3663        )
3664        .expect("create dataset");
3665        let arr = call_builtin(
3666            "Dataset.array",
3667            &[ds, Value::String("temperature".to_string())],
3668        )
3669        .expect("dataset array");
3670
3671        let full = Value::Tensor(
3672            Tensor::new((1..=16).map(|v| v as f64).collect(), vec![4, 4]).expect("full tensor"),
3673        );
3674        call_builtin("DataArray.write", &[arr.clone(), full]).expect("initial write");
3675
3676        let root = std::path::PathBuf::from(&path);
3677        let untouched_path = root.join("arrays/temperature/chunks/obj_1_1.json");
3678        let touched_path = root.join("arrays/temperature/chunks/obj_0_0.json");
3679        let untouched_before =
3680            futures::executor::block_on(runmat_filesystem::read_async(&untouched_path))
3681                .expect("read untouched before");
3682        let touched_before =
3683            futures::executor::block_on(runmat_filesystem::read_async(&touched_path))
3684                .expect("read touched before");
3685
3686        let slice = Value::Cell(
3687            CellArray::new(
3688                vec![
3689                    Value::Tensor(Tensor::new(vec![1.0, 2.0], vec![1, 2]).expect("range")),
3690                    Value::Tensor(Tensor::new(vec![1.0, 2.0], vec![1, 2]).expect("range")),
3691                ],
3692                1,
3693                2,
3694            )
3695            .expect("slice cell"),
3696        );
3697        let rhs =
3698            Value::Tensor(Tensor::new(vec![99.0, 98.0, 97.0, 96.0], vec![2, 2]).expect("rhs"));
3699        call_builtin("DataArray.write", &[arr.clone(), slice, rhs]).expect("slice write");
3700
3701        let untouched_after =
3702            futures::executor::block_on(runmat_filesystem::read_async(&untouched_path))
3703                .expect("read untouched after");
3704        let touched_after =
3705            futures::executor::block_on(runmat_filesystem::read_async(&touched_path))
3706                .expect("read touched after");
3707        assert_eq!(untouched_before, untouched_after);
3708        assert_ne!(touched_before, touched_after);
3709    }
3710
3711    #[test]
3712    fn slice_write_uploads_only_touched_chunk_targets() {
3713        let _serial = serial_test_guard();
3714        let provider = Arc::new(CountingDataUploadProvider::default());
3715        let uploaded = provider.uploaded_keys();
3716        let _guard = runmat_filesystem::replace_provider(provider);
3717
3718        let dir = tempfile::tempdir().expect("tempdir");
3719        let path = dir
3720            .path()
3721            .join("remote-chunked.data")
3722            .to_string_lossy()
3723            .to_string();
3724
3725        let mut array_meta = StructValue::new();
3726        array_meta
3727            .fields
3728            .insert("dtype".to_string(), Value::String("f64".to_string()));
3729        array_meta.fields.insert(
3730            "shape".to_string(),
3731            Value::Tensor(Tensor::new(vec![4.0, 4.0], vec![1, 2]).expect("shape tensor")),
3732        );
3733        array_meta.fields.insert(
3734            "chunk".to_string(),
3735            Value::Tensor(Tensor::new(vec![2.0, 2.0], vec![1, 2]).expect("chunk tensor")),
3736        );
3737        let mut arrays = StructValue::new();
3738        arrays
3739            .fields
3740            .insert("temperature".to_string(), Value::Struct(array_meta));
3741        let mut schema = StructValue::new();
3742        schema
3743            .fields
3744            .insert("arrays".to_string(), Value::Struct(arrays));
3745
3746        let ds = call_builtin(
3747            "data.create",
3748            &[
3749                Value::String(path.clone()),
3750                Value::Struct(schema),
3751                Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
3752            ],
3753        )
3754        .expect("create dataset");
3755        let arr = call_builtin(
3756            "Dataset.array",
3757            &[ds, Value::String("temperature".to_string())],
3758        )
3759        .expect("dataset array");
3760
3761        call_builtin(
3762            "DataArray.write",
3763            &[
3764                arr.clone(),
3765                Value::Tensor(
3766                    Tensor::new((1..=16).map(|v| v as f64).collect(), vec![4, 4])
3767                        .expect("full tensor"),
3768                ),
3769            ],
3770        )
3771        .expect("initial write");
3772
3773        let manifest =
3774            futures::executor::block_on(crate::data::read_manifest_async(&dataset_root(&path)))
3775                .expect("manifest after initial write");
3776        let meta = manifest
3777            .arrays
3778            .get("temperature")
3779            .expect("temperature meta");
3780        let chunk_index_path =
3781            dataset_root(&path).join(meta.chunk_index_path.clone().expect("chunk index path"));
3782        assert!(
3783            futures::executor::block_on(runmat_filesystem::metadata_async(&chunk_index_path))
3784                .is_ok()
3785        );
3786
3787        {
3788            let mut keys = uploaded.lock().expect("uploaded keys lock");
3789            keys.clear();
3790        }
3791
3792        let slice = Value::Cell(
3793            CellArray::new(
3794                vec![
3795                    Value::Tensor(Tensor::new(vec![1.0, 2.0], vec![1, 2]).expect("range")),
3796                    Value::Tensor(Tensor::new(vec![1.0, 2.0], vec![1, 2]).expect("range")),
3797                ],
3798                1,
3799                2,
3800            )
3801            .expect("slice cell"),
3802        );
3803        let rhs = Value::Tensor(Tensor::new(vec![9.0, 8.0, 7.0, 6.0], vec![2, 2]).expect("rhs"));
3804        call_builtin("DataArray.write", &[arr, slice, rhs]).expect("slice write");
3805
3806        let keys = uploaded.lock().expect("uploaded keys lock");
3807        assert_eq!(keys.as_slice(), ["0.0".to_string()].as_slice());
3808    }
3809
3810    #[test]
3811    fn slice_write_uploads_expected_cross_boundary_chunk_targets() {
3812        let _serial = serial_test_guard();
3813        let provider = Arc::new(CountingDataUploadProvider::default());
3814        let uploaded = provider.uploaded_keys();
3815        let _guard = runmat_filesystem::replace_provider(provider);
3816
3817        let dir = tempfile::tempdir().expect("tempdir");
3818        let path = dir
3819            .path()
3820            .join("remote-chunked-boundary.data")
3821            .to_string_lossy()
3822            .to_string();
3823
3824        let mut array_meta = StructValue::new();
3825        array_meta
3826            .fields
3827            .insert("dtype".to_string(), Value::String("f64".to_string()));
3828        array_meta.fields.insert(
3829            "shape".to_string(),
3830            Value::Tensor(Tensor::new(vec![4.0, 4.0], vec![1, 2]).expect("shape tensor")),
3831        );
3832        array_meta.fields.insert(
3833            "chunk".to_string(),
3834            Value::Tensor(Tensor::new(vec![2.0, 2.0], vec![1, 2]).expect("chunk tensor")),
3835        );
3836        let mut arrays = StructValue::new();
3837        arrays
3838            .fields
3839            .insert("temperature".to_string(), Value::Struct(array_meta));
3840        let mut schema = StructValue::new();
3841        schema
3842            .fields
3843            .insert("arrays".to_string(), Value::Struct(arrays));
3844
3845        let ds = call_builtin(
3846            "data.create",
3847            &[
3848                Value::String(path.clone()),
3849                Value::Struct(schema),
3850                Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
3851            ],
3852        )
3853        .expect("create dataset");
3854        let arr = call_builtin(
3855            "Dataset.array",
3856            &[ds, Value::String("temperature".to_string())],
3857        )
3858        .expect("dataset array");
3859
3860        call_builtin(
3861            "DataArray.write",
3862            &[
3863                arr.clone(),
3864                Value::Tensor(
3865                    Tensor::new((1..=16).map(|v| v as f64).collect(), vec![4, 4])
3866                        .expect("full tensor"),
3867                ),
3868            ],
3869        )
3870        .expect("initial write");
3871        {
3872            let mut keys = uploaded.lock().expect("uploaded keys lock");
3873            keys.clear();
3874        }
3875
3876        let slice = Value::Cell(
3877            CellArray::new(
3878                vec![
3879                    Value::Tensor(Tensor::new(vec![2.0, 3.0], vec![1, 2]).expect("range")),
3880                    Value::Tensor(Tensor::new(vec![2.0, 3.0], vec![1, 2]).expect("range")),
3881                ],
3882                1,
3883                2,
3884            )
3885            .expect("slice cell"),
3886        );
3887        let rhs =
3888            Value::Tensor(Tensor::new(vec![19.0, 18.0, 17.0, 16.0], vec![2, 2]).expect("rhs"));
3889        call_builtin("DataArray.write", &[arr, slice, rhs]).expect("slice write");
3890
3891        let mut keys = uploaded.lock().expect("uploaded keys lock").clone();
3892        keys.sort();
3893        keys.dedup();
3894        assert_eq!(
3895            keys.as_slice(),
3896            [
3897                "0.0".to_string(),
3898                "0.1".to_string(),
3899                "1.0".to_string(),
3900                "1.1".to_string(),
3901            ]
3902            .as_slice()
3903        );
3904    }
3905
3906    #[test]
3907    fn slice_write_hits_http_server_data_endpoints_with_expected_keys() {
3908        let _serial = serial_test_guard();
3909        let (base_url, uploads, runtime, shutdown_tx) = spawn_upload_server();
3910        let provider = Arc::new(HttpDataUploadProvider::new(base_url));
3911        let _guard = runmat_filesystem::replace_provider(provider);
3912
3913        let dir = tempfile::tempdir().expect("tempdir");
3914        let path = dir
3915            .path()
3916            .join("http-endpoint.data")
3917            .to_string_lossy()
3918            .to_string();
3919
3920        let mut array_meta = StructValue::new();
3921        array_meta
3922            .fields
3923            .insert("dtype".to_string(), Value::String("f64".to_string()));
3924        array_meta.fields.insert(
3925            "shape".to_string(),
3926            Value::Tensor(Tensor::new(vec![4.0, 4.0], vec![1, 2]).expect("shape tensor")),
3927        );
3928        array_meta.fields.insert(
3929            "chunk".to_string(),
3930            Value::Tensor(Tensor::new(vec![2.0, 2.0], vec![1, 2]).expect("chunk tensor")),
3931        );
3932        let mut arrays = StructValue::new();
3933        arrays
3934            .fields
3935            .insert("temperature".to_string(), Value::Struct(array_meta));
3936        let mut schema = StructValue::new();
3937        schema
3938            .fields
3939            .insert("arrays".to_string(), Value::Struct(arrays));
3940
3941        let ds = call_builtin(
3942            "data.create",
3943            &[
3944                Value::String(path.clone()),
3945                Value::Struct(schema),
3946                Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
3947            ],
3948        )
3949        .expect("create dataset");
3950        let arr = call_builtin(
3951            "Dataset.array",
3952            &[ds, Value::String("temperature".to_string())],
3953        )
3954        .expect("dataset array");
3955
3956        call_builtin(
3957            "DataArray.write",
3958            &[
3959                arr.clone(),
3960                Value::Tensor(
3961                    Tensor::new((1..=16).map(|v| v as f64).collect(), vec![4, 4])
3962                        .expect("full tensor"),
3963                ),
3964            ],
3965        )
3966        .expect("initial write");
3967
3968        {
3969            let mut keys = uploads.lock().expect("uploads lock");
3970            keys.clear();
3971        }
3972
3973        let slice = Value::Cell(
3974            CellArray::new(
3975                vec![
3976                    Value::Tensor(Tensor::new(vec![2.0, 3.0], vec![1, 2]).expect("range")),
3977                    Value::Tensor(Tensor::new(vec![2.0, 3.0], vec![1, 2]).expect("range")),
3978                ],
3979                1,
3980                2,
3981            )
3982            .expect("slice cell"),
3983        );
3984        let rhs =
3985            Value::Tensor(Tensor::new(vec![19.0, 18.0, 17.0, 16.0], vec![2, 2]).expect("rhs"));
3986        call_builtin("DataArray.write", &[arr, slice, rhs]).expect("slice write");
3987
3988        let mut keys = uploads.lock().expect("uploads lock").clone();
3989        keys.sort();
3990        keys.dedup();
3991        assert_eq!(
3992            keys.as_slice(),
3993            [
3994                "0.0".to_string(),
3995                "0.1".to_string(),
3996                "1.0".to_string(),
3997                "1.1".to_string(),
3998            ]
3999            .as_slice()
4000        );
4001
4002        let _ = shutdown_tx.send(());
4003        drop(runtime);
4004    }
4005
4006    #[test]
4007    fn tx_create_resize_fill_and_delete_array() {
4008        let _serial = serial_test_guard();
4009        let dir = tempfile::tempdir().expect("tempdir");
4010        let path = dir.path().join("tx-ops.data").to_string_lossy().to_string();
4011
4012        let mut arrays = StructValue::new();
4013        let mut array_meta = StructValue::new();
4014        array_meta
4015            .fields
4016            .insert("dtype".to_string(), Value::String("f64".to_string()));
4017        array_meta.fields.insert(
4018            "shape".to_string(),
4019            Value::Tensor(Tensor::new(vec![1.0, 1.0], vec![1, 2]).expect("shape tensor")),
4020        );
4021        arrays
4022            .fields
4023            .insert("base".to_string(), Value::Struct(array_meta));
4024        let mut schema = StructValue::new();
4025        schema
4026            .fields
4027            .insert("arrays".to_string(), Value::Struct(arrays));
4028
4029        let ds = call_builtin(
4030            "data.create",
4031            &[
4032                Value::String(path.clone()),
4033                Value::Struct(schema),
4034                Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
4035            ],
4036        )
4037        .expect("create dataset");
4038
4039        let tx = call_builtin("Dataset.begin", &[ds]).expect("begin tx");
4040        let mut new_meta = StructValue::new();
4041        new_meta
4042            .fields
4043            .insert("dtype".to_string(), Value::String("f64".to_string()));
4044        new_meta.fields.insert(
4045            "shape".to_string(),
4046            Value::Tensor(Tensor::new(vec![2.0, 2.0], vec![1, 2]).expect("shape tensor")),
4047        );
4048        call_builtin(
4049            "DataTransaction.create_array",
4050            &[
4051                tx.clone(),
4052                Value::String("new_array".to_string()),
4053                Value::Struct(new_meta),
4054            ],
4055        )
4056        .expect("create array in tx");
4057        call_builtin(
4058            "DataTransaction.resize",
4059            &[
4060                tx.clone(),
4061                Value::String("new_array".to_string()),
4062                Value::Tensor(Tensor::new(vec![3.0, 1.0], vec![1, 2]).expect("shape tensor")),
4063            ],
4064        )
4065        .expect("resize array in tx");
4066        call_builtin(
4067            "DataTransaction.fill",
4068            &[
4069                tx.clone(),
4070                Value::String("new_array".to_string()),
4071                Value::Num(7.0),
4072            ],
4073        )
4074        .expect("fill array in tx");
4075        call_builtin(
4076            "DataTransaction.delete_array",
4077            &[tx.clone(), Value::String("base".to_string())],
4078        )
4079        .expect("delete array in tx");
4080        call_builtin("DataTransaction.commit", &[tx]).expect("commit tx");
4081
4082        let ds = call_builtin(
4083            "data.open",
4084            &[
4085                Value::String(path),
4086                Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
4087            ],
4088        )
4089        .expect("open dataset");
4090        let has_base = call_builtin(
4091            "Dataset.has_array",
4092            &[ds.clone(), Value::String("base".to_string())],
4093        )
4094        .expect("has base");
4095        assert_eq!(has_base, Value::Bool(false));
4096        let arr = call_builtin(
4097            "Dataset.array",
4098            &[ds, Value::String("new_array".to_string())],
4099        )
4100        .expect("new array");
4101        let read_back = call_builtin("DataArray.read", &[arr]).expect("read array");
4102        let Value::Tensor(t) = read_back else {
4103            panic!("expected tensor");
4104        };
4105        assert_eq!(t.shape, vec![3, 1]);
4106        assert_eq!(t.data, vec![7.0, 7.0, 7.0]);
4107    }
4108}