1use std::collections::BTreeMap;
4use std::collections::HashMap;
5use std::path::PathBuf;
6
7use runmat_builtins::{
8 BuiltinCompletionPolicy, BuiltinDescriptor, BuiltinErrorDescriptor, BuiltinOutputMode,
9 BuiltinParamArity, BuiltinParamDescriptor, BuiltinParamType, BuiltinSignatureDescriptor,
10 ObjectInstance, StructValue, Tensor, Value,
11};
12use runmat_filesystem::data_contract::{DataChunkDescriptor, DataChunkUploadRequest};
13use runmat_macros::runtime_builtin;
14
15use crate::builtins::common::spec::{
16 BroadcastSemantics, BuiltinFusionSpec, BuiltinGpuSpec, ConstantStrategy, GpuOpKind,
17 ReductionNaN, ResidencyPolicy, ShapeRequirements,
18};
19use crate::data::{
20 array_object, data_error, dataset_object, dataset_root, ensure_manifest_sequence,
21 get_object_prop, manifest_path, manifest_version_token, now_rfc3339, parse_schema,
22 parse_string, read_array_payload_async, read_manifest_async, remove_tx, sha256_hex, start_tx,
23 transaction_object, with_tx, with_tx_mut, write_array_payload_async, write_manifest_async,
24 DataArrayMeta, DataArrayPayload, DataChunkIndex, DataChunkIndexEntry, DataManifest,
25 PendingCreateArray, PendingFill, PendingResize, PendingWrite, TxnStatus,
26};
27use crate::{make_cell, BuiltinResult};
28
29#[runmat_macros::register_gpu_spec(builtin_path = "crate::builtins::io::data")]
30pub const GPU_SPEC: BuiltinGpuSpec = BuiltinGpuSpec {
31 name: "data.*",
32 op_kind: GpuOpKind::Custom("io-data"),
33 supported_precisions: &[],
34 broadcast: BroadcastSemantics::None,
35 provider_hooks: &[],
36 constant_strategy: ConstantStrategy::InlineLiteral,
37 residency: ResidencyPolicy::GatherImmediately,
38 nan_mode: ReductionNaN::Include,
39 two_pass_threshold: None,
40 workgroup_size: None,
41 accepts_nan_mode: false,
42 notes: "Dataset operations are host I/O and metadata orchestration.",
43};
44
45#[runmat_macros::register_fusion_spec(builtin_path = "crate::builtins::io::data")]
46pub const FUSION_SPEC: BuiltinFusionSpec = BuiltinFusionSpec {
47 name: "data.*",
48 shape: ShapeRequirements::Any,
49 constant_strategy: ConstantStrategy::InlineLiteral,
50 elementwise: None,
51 reduction: None,
52 emits_nan: false,
53 notes: "Data builtins are side-effecting and not fusible.",
54};
55
56const DATA_ERROR_INVALID_ARGUMENT: BuiltinErrorDescriptor = BuiltinErrorDescriptor {
57 code: "RM.DATA.INVALID_ARGUMENT",
58 identifier: Some("RunMat:data:InvalidArgument"),
59 when: "Arguments, receiver object, or option grammar are invalid for the requested data API.",
60 message: "data: invalid argument",
61};
62
63const DATA_ERROR_NOT_FOUND: BuiltinErrorDescriptor = BuiltinErrorDescriptor {
64 code: "RM.DATA.NOT_FOUND",
65 identifier: Some("RunMat:data:NotFound"),
66 when: "Referenced dataset/array/transaction object is missing or cannot be resolved.",
67 message: "data: requested object not found",
68};
69
70const DATA_ERROR_INTERNAL: BuiltinErrorDescriptor = BuiltinErrorDescriptor {
71 code: "RM.DATA.INTERNAL",
72 identifier: Some("RunMat:data:Internal"),
73 when: "Filesystem/manifest/chunk processing fails unexpectedly during data API execution.",
74 message: "data: internal operation failed",
75};
76
77const DATA_DESCRIPTOR_ERRORS: [BuiltinErrorDescriptor; 3] = [
78 DATA_ERROR_INVALID_ARGUMENT,
79 DATA_ERROR_NOT_FOUND,
80 DATA_ERROR_INTERNAL,
81];
82
83const OUT_DATASET: [BuiltinParamDescriptor; 1] = [BuiltinParamDescriptor {
84 name: "ds",
85 ty: BuiltinParamType::Any,
86 arity: BuiltinParamArity::Required,
87 default: None,
88 description: "Dataset handle object.",
89}];
90
91const OUT_ARRAY: [BuiltinParamDescriptor; 1] = [BuiltinParamDescriptor {
92 name: "arr",
93 ty: BuiltinParamType::Any,
94 arity: BuiltinParamArity::Required,
95 default: None,
96 description: "DataArray handle object.",
97}];
98
99const OUT_TX: [BuiltinParamDescriptor; 1] = [BuiltinParamDescriptor {
100 name: "tx",
101 ty: BuiltinParamType::Any,
102 arity: BuiltinParamArity::Required,
103 default: None,
104 description: "DataTransaction handle object.",
105}];
106
107const OUT_BOOL: [BuiltinParamDescriptor; 1] = [BuiltinParamDescriptor {
108 name: "ok",
109 ty: BuiltinParamType::LogicalArray,
110 arity: BuiltinParamArity::Required,
111 default: None,
112 description: "Logical success/result flag.",
113}];
114
115const OUT_STRING: [BuiltinParamDescriptor; 1] = [BuiltinParamDescriptor {
116 name: "s",
117 ty: BuiltinParamType::StringScalar,
118 arity: BuiltinParamArity::Required,
119 default: None,
120 description: "String scalar result.",
121}];
122
123const OUT_STRUCT: [BuiltinParamDescriptor; 1] = [BuiltinParamDescriptor {
124 name: "S",
125 ty: BuiltinParamType::Any,
126 arity: BuiltinParamArity::Required,
127 default: None,
128 description: "Struct result.",
129}];
130
131const OUT_CELL: [BuiltinParamDescriptor; 1] = [BuiltinParamDescriptor {
132 name: "C",
133 ty: BuiltinParamType::Any,
134 arity: BuiltinParamArity::Required,
135 default: None,
136 description: "Cell-array result.",
137}];
138
139const OUT_VALUE: [BuiltinParamDescriptor; 1] = [BuiltinParamDescriptor {
140 name: "value",
141 ty: BuiltinParamType::Any,
142 arity: BuiltinParamArity::Required,
143 default: None,
144 description: "Value result.",
145}];
146
147const OUT_TENSOR: [BuiltinParamDescriptor; 1] = [BuiltinParamDescriptor {
148 name: "X",
149 ty: BuiltinParamType::NumericArray,
150 arity: BuiltinParamArity::Required,
151 default: None,
152 description: "Numeric tensor result.",
153}];
154
155const IN_PATH_SCHEMA_REST: [BuiltinParamDescriptor; 3] = [
156 BuiltinParamDescriptor {
157 name: "path",
158 ty: BuiltinParamType::StringScalar,
159 arity: BuiltinParamArity::Required,
160 default: None,
161 description: "Dataset path (.data).",
162 },
163 BuiltinParamDescriptor {
164 name: "schema",
165 ty: BuiltinParamType::Any,
166 arity: BuiltinParamArity::Required,
167 default: None,
168 description: "Dataset schema struct.",
169 },
170 BuiltinParamDescriptor {
171 name: "options",
172 ty: BuiltinParamType::Any,
173 arity: BuiltinParamArity::Variadic,
174 default: None,
175 description: "Reserved name/value options.",
176 },
177];
178
179const IN_PATH_REST: [BuiltinParamDescriptor; 2] = [
180 BuiltinParamDescriptor {
181 name: "path",
182 ty: BuiltinParamType::StringScalar,
183 arity: BuiltinParamArity::Required,
184 default: None,
185 description: "Dataset path (.data).",
186 },
187 BuiltinParamDescriptor {
188 name: "options",
189 ty: BuiltinParamType::Any,
190 arity: BuiltinParamArity::Variadic,
191 default: None,
192 description: "Reserved name/value options.",
193 },
194];
195
196const IN_FROM_TO_REST: [BuiltinParamDescriptor; 3] = [
197 BuiltinParamDescriptor {
198 name: "fromPath",
199 ty: BuiltinParamType::StringScalar,
200 arity: BuiltinParamArity::Required,
201 default: None,
202 description: "Source dataset path.",
203 },
204 BuiltinParamDescriptor {
205 name: "toPath",
206 ty: BuiltinParamType::StringScalar,
207 arity: BuiltinParamArity::Required,
208 default: None,
209 description: "Destination dataset path.",
210 },
211 BuiltinParamDescriptor {
212 name: "options",
213 ty: BuiltinParamType::Any,
214 arity: BuiltinParamArity::Variadic,
215 default: None,
216 description: "Reserved name/value options.",
217 },
218];
219
220const IN_PATH_FORMAT_TARGET_REST: [BuiltinParamDescriptor; 4] = [
221 BuiltinParamDescriptor {
222 name: "path",
223 ty: BuiltinParamType::StringScalar,
224 arity: BuiltinParamArity::Required,
225 default: None,
226 description: "Dataset path (.data).",
227 },
228 BuiltinParamDescriptor {
229 name: "format",
230 ty: BuiltinParamType::StringScalar,
231 arity: BuiltinParamArity::Required,
232 default: None,
233 description: "Format token (currently 'data').",
234 },
235 BuiltinParamDescriptor {
236 name: "targetPath",
237 ty: BuiltinParamType::StringScalar,
238 arity: BuiltinParamArity::Required,
239 default: None,
240 description: "Target dataset path.",
241 },
242 BuiltinParamDescriptor {
243 name: "options",
244 ty: BuiltinParamType::Any,
245 arity: BuiltinParamArity::Variadic,
246 default: None,
247 description: "Reserved name/value options.",
248 },
249];
250
251const IN_PREFIX_REST: [BuiltinParamDescriptor; 2] = [
252 BuiltinParamDescriptor {
253 name: "prefix",
254 ty: BuiltinParamType::StringScalar,
255 arity: BuiltinParamArity::Required,
256 default: None,
257 description: "Filesystem prefix to scan for datasets.",
258 },
259 BuiltinParamDescriptor {
260 name: "options",
261 ty: BuiltinParamType::Any,
262 arity: BuiltinParamArity::Variadic,
263 default: None,
264 description: "Reserved name/value options.",
265 },
266];
267
268const IN_BASE: [BuiltinParamDescriptor; 1] = [BuiltinParamDescriptor {
269 name: "obj",
270 ty: BuiltinParamType::Any,
271 arity: BuiltinParamArity::Required,
272 default: None,
273 description: "Receiver object.",
274}];
275
276const IN_BASE_NAME: [BuiltinParamDescriptor; 2] = [
277 BuiltinParamDescriptor {
278 name: "obj",
279 ty: BuiltinParamType::Any,
280 arity: BuiltinParamArity::Required,
281 default: None,
282 description: "Receiver object.",
283 },
284 BuiltinParamDescriptor {
285 name: "name",
286 ty: BuiltinParamType::StringScalar,
287 arity: BuiltinParamArity::Required,
288 default: None,
289 description: "Name key/array identifier.",
290 },
291];
292
293const IN_BASE_KEY_DEFAULT: [BuiltinParamDescriptor; 3] = [
294 BuiltinParamDescriptor {
295 name: "obj",
296 ty: BuiltinParamType::Any,
297 arity: BuiltinParamArity::Required,
298 default: None,
299 description: "Receiver object.",
300 },
301 BuiltinParamDescriptor {
302 name: "key",
303 ty: BuiltinParamType::StringScalar,
304 arity: BuiltinParamArity::Required,
305 default: None,
306 description: "Attribute key.",
307 },
308 BuiltinParamDescriptor {
309 name: "defaultValue",
310 ty: BuiltinParamType::Any,
311 arity: BuiltinParamArity::Optional,
312 default: Some("0"),
313 description: "Fallback value when key is absent.",
314 },
315];
316
317const IN_BASE_KEY_VALUE: [BuiltinParamDescriptor; 3] = [
318 BuiltinParamDescriptor {
319 name: "obj",
320 ty: BuiltinParamType::Any,
321 arity: BuiltinParamArity::Required,
322 default: None,
323 description: "Receiver object.",
324 },
325 BuiltinParamDescriptor {
326 name: "key",
327 ty: BuiltinParamType::StringScalar,
328 arity: BuiltinParamArity::Required,
329 default: None,
330 description: "Attribute key.",
331 },
332 BuiltinParamDescriptor {
333 name: "value",
334 ty: BuiltinParamType::Any,
335 arity: BuiltinParamArity::Required,
336 default: None,
337 description: "Attribute value.",
338 },
339];
340
341const IN_BASE_ATTRS: [BuiltinParamDescriptor; 2] = [
342 BuiltinParamDescriptor {
343 name: "obj",
344 ty: BuiltinParamType::Any,
345 arity: BuiltinParamArity::Required,
346 default: None,
347 description: "Receiver object.",
348 },
349 BuiltinParamDescriptor {
350 name: "attrs",
351 ty: BuiltinParamType::Any,
352 arity: BuiltinParamArity::Required,
353 default: None,
354 description: "Struct of attribute updates.",
355 },
356];
357
358const IN_BASE_LABEL_REST: [BuiltinParamDescriptor; 3] = [
359 BuiltinParamDescriptor {
360 name: "obj",
361 ty: BuiltinParamType::Any,
362 arity: BuiltinParamArity::Required,
363 default: None,
364 description: "Receiver object.",
365 },
366 BuiltinParamDescriptor {
367 name: "label",
368 ty: BuiltinParamType::StringScalar,
369 arity: BuiltinParamArity::Required,
370 default: None,
371 description: "Snapshot label.",
372 },
373 BuiltinParamDescriptor {
374 name: "options",
375 ty: BuiltinParamType::Any,
376 arity: BuiltinParamArity::Variadic,
377 default: None,
378 description: "Reserved name/value options.",
379 },
380];
381
382const IN_BASE_REST: [BuiltinParamDescriptor; 2] = [
383 BuiltinParamDescriptor {
384 name: "obj",
385 ty: BuiltinParamType::Any,
386 arity: BuiltinParamArity::Required,
387 default: None,
388 description: "Receiver object.",
389 },
390 BuiltinParamDescriptor {
391 name: "options",
392 ty: BuiltinParamType::Any,
393 arity: BuiltinParamArity::Variadic,
394 default: None,
395 description: "Reserved name/value options.",
396 },
397];
398
399const IN_BASE_SLICE_OPTIONAL: [BuiltinParamDescriptor; 2] = [
400 BuiltinParamDescriptor {
401 name: "obj",
402 ty: BuiltinParamType::Any,
403 arity: BuiltinParamArity::Required,
404 default: None,
405 description: "DataArray receiver object.",
406 },
407 BuiltinParamDescriptor {
408 name: "sliceSpec",
409 ty: BuiltinParamType::Any,
410 arity: BuiltinParamArity::Optional,
411 default: None,
412 description: "Optional slice specification.",
413 },
414];
415
416const IN_BASE_VALUES: [BuiltinParamDescriptor; 2] = [
417 BuiltinParamDescriptor {
418 name: "obj",
419 ty: BuiltinParamType::Any,
420 arity: BuiltinParamArity::Required,
421 default: None,
422 description: "DataArray receiver object.",
423 },
424 BuiltinParamDescriptor {
425 name: "values",
426 ty: BuiltinParamType::Any,
427 arity: BuiltinParamArity::Required,
428 default: None,
429 description: "Full-array values payload.",
430 },
431];
432
433const IN_BASE_SLICE_VALUES: [BuiltinParamDescriptor; 3] = [
434 BuiltinParamDescriptor {
435 name: "obj",
436 ty: BuiltinParamType::Any,
437 arity: BuiltinParamArity::Required,
438 default: None,
439 description: "DataArray receiver object.",
440 },
441 BuiltinParamDescriptor {
442 name: "sliceSpec",
443 ty: BuiltinParamType::Any,
444 arity: BuiltinParamArity::Required,
445 default: None,
446 description: "Slice specification.",
447 },
448 BuiltinParamDescriptor {
449 name: "values",
450 ty: BuiltinParamType::Any,
451 arity: BuiltinParamArity::Required,
452 default: None,
453 description: "Slice values payload.",
454 },
455];
456
457const IN_BASE_NEW_SHAPE_REST: [BuiltinParamDescriptor; 3] = [
458 BuiltinParamDescriptor {
459 name: "obj",
460 ty: BuiltinParamType::Any,
461 arity: BuiltinParamArity::Required,
462 default: None,
463 description: "DataArray receiver object.",
464 },
465 BuiltinParamDescriptor {
466 name: "newShape",
467 ty: BuiltinParamType::Any,
468 arity: BuiltinParamArity::Required,
469 default: None,
470 description: "New array shape.",
471 },
472 BuiltinParamDescriptor {
473 name: "options",
474 ty: BuiltinParamType::Any,
475 arity: BuiltinParamArity::Variadic,
476 default: None,
477 description: "Reserved name/value options.",
478 },
479];
480
481const IN_BASE_VALUE_REST: [BuiltinParamDescriptor; 3] = [
482 BuiltinParamDescriptor {
483 name: "obj",
484 ty: BuiltinParamType::Any,
485 arity: BuiltinParamArity::Required,
486 default: None,
487 description: "DataArray receiver object.",
488 },
489 BuiltinParamDescriptor {
490 name: "value",
491 ty: BuiltinParamType::Any,
492 arity: BuiltinParamArity::Required,
493 default: None,
494 description: "Fill value.",
495 },
496 BuiltinParamDescriptor {
497 name: "options",
498 ty: BuiltinParamType::Any,
499 arity: BuiltinParamArity::Variadic,
500 default: None,
501 description: "Reserved name/value options.",
502 },
503];
504
505const IN_TX_WRITE: [BuiltinParamDescriptor; 5] = [
506 BuiltinParamDescriptor {
507 name: "tx",
508 ty: BuiltinParamType::Any,
509 arity: BuiltinParamArity::Required,
510 default: None,
511 description: "DataTransaction receiver.",
512 },
513 BuiltinParamDescriptor {
514 name: "arrayName",
515 ty: BuiltinParamType::StringScalar,
516 arity: BuiltinParamArity::Required,
517 default: None,
518 description: "Target array name.",
519 },
520 BuiltinParamDescriptor {
521 name: "sliceSpec",
522 ty: BuiltinParamType::Any,
523 arity: BuiltinParamArity::Required,
524 default: None,
525 description: "Slice specification.",
526 },
527 BuiltinParamDescriptor {
528 name: "values",
529 ty: BuiltinParamType::Any,
530 arity: BuiltinParamArity::Required,
531 default: None,
532 description: "Values payload.",
533 },
534 BuiltinParamDescriptor {
535 name: "options",
536 ty: BuiltinParamType::Any,
537 arity: BuiltinParamArity::Variadic,
538 default: None,
539 description: "Reserved name/value options.",
540 },
541];
542
543const IN_TX_ARRAY_SHAPE_REST: [BuiltinParamDescriptor; 4] = [
544 BuiltinParamDescriptor {
545 name: "tx",
546 ty: BuiltinParamType::Any,
547 arity: BuiltinParamArity::Required,
548 default: None,
549 description: "DataTransaction receiver.",
550 },
551 BuiltinParamDescriptor {
552 name: "arrayName",
553 ty: BuiltinParamType::StringScalar,
554 arity: BuiltinParamArity::Required,
555 default: None,
556 description: "Target array name.",
557 },
558 BuiltinParamDescriptor {
559 name: "newShape",
560 ty: BuiltinParamType::Any,
561 arity: BuiltinParamArity::Required,
562 default: None,
563 description: "New shape vector.",
564 },
565 BuiltinParamDescriptor {
566 name: "options",
567 ty: BuiltinParamType::Any,
568 arity: BuiltinParamArity::Variadic,
569 default: None,
570 description: "Reserved name/value options.",
571 },
572];
573
574const IN_TX_ARRAY_VALUE_SLICE_OPT: [BuiltinParamDescriptor; 4] = [
575 BuiltinParamDescriptor {
576 name: "tx",
577 ty: BuiltinParamType::Any,
578 arity: BuiltinParamArity::Required,
579 default: None,
580 description: "DataTransaction receiver.",
581 },
582 BuiltinParamDescriptor {
583 name: "arrayName",
584 ty: BuiltinParamType::StringScalar,
585 arity: BuiltinParamArity::Required,
586 default: None,
587 description: "Target array name.",
588 },
589 BuiltinParamDescriptor {
590 name: "value",
591 ty: BuiltinParamType::Any,
592 arity: BuiltinParamArity::Required,
593 default: None,
594 description: "Fill value.",
595 },
596 BuiltinParamDescriptor {
597 name: "sliceSpec",
598 ty: BuiltinParamType::Any,
599 arity: BuiltinParamArity::Optional,
600 default: None,
601 description: "Optional slice specification.",
602 },
603];
604
605const IN_TX_ARRAY_NAME: [BuiltinParamDescriptor; 2] = [
606 BuiltinParamDescriptor {
607 name: "tx",
608 ty: BuiltinParamType::Any,
609 arity: BuiltinParamArity::Required,
610 default: None,
611 description: "DataTransaction receiver.",
612 },
613 BuiltinParamDescriptor {
614 name: "arrayName",
615 ty: BuiltinParamType::StringScalar,
616 arity: BuiltinParamArity::Required,
617 default: None,
618 description: "Target array name.",
619 },
620];
621
622const IN_TX_ARRAY_META: [BuiltinParamDescriptor; 3] = [
623 BuiltinParamDescriptor {
624 name: "tx",
625 ty: BuiltinParamType::Any,
626 arity: BuiltinParamArity::Required,
627 default: None,
628 description: "DataTransaction receiver.",
629 },
630 BuiltinParamDescriptor {
631 name: "arrayName",
632 ty: BuiltinParamType::StringScalar,
633 arity: BuiltinParamArity::Required,
634 default: None,
635 description: "New array name.",
636 },
637 BuiltinParamDescriptor {
638 name: "meta",
639 ty: BuiltinParamType::Any,
640 arity: BuiltinParamArity::Required,
641 default: None,
642 description: "Array metadata struct.",
643 },
644];
645
646const IN_TX_COMMIT_REST: [BuiltinParamDescriptor; 2] = [
647 BuiltinParamDescriptor {
648 name: "tx",
649 ty: BuiltinParamType::Any,
650 arity: BuiltinParamArity::Required,
651 default: None,
652 description: "DataTransaction receiver.",
653 },
654 BuiltinParamDescriptor {
655 name: "options",
656 ty: BuiltinParamType::Any,
657 arity: BuiltinParamArity::Variadic,
658 default: None,
659 description: "Commit options (e.g. if_manifest).",
660 },
661];
662
663macro_rules! one_sig_descriptor {
664 ($desc:ident, $sigs:ident, $label:expr, $inputs:expr, $outputs:expr) => {
665 const $sigs: [BuiltinSignatureDescriptor; 1] = [BuiltinSignatureDescriptor {
666 label: $label,
667 inputs: $inputs,
668 outputs: $outputs,
669 }];
670 pub const $desc: BuiltinDescriptor = BuiltinDescriptor {
671 signatures: &$sigs,
672 output_mode: BuiltinOutputMode::Fixed,
673 completion_policy: BuiltinCompletionPolicy::Public,
674 errors: &DATA_DESCRIPTOR_ERRORS,
675 };
676 };
677}
678
679one_sig_descriptor!(
680 DATA_CREATE_DESCRIPTOR,
681 DATA_CREATE_SIGS,
682 "ds = data.create(path, schema, Name, Value, ...)",
683 &IN_PATH_SCHEMA_REST,
684 &OUT_DATASET
685);
686one_sig_descriptor!(
687 DATA_OPEN_DESCRIPTOR,
688 DATA_OPEN_SIGS,
689 "ds = data.open(path, Name, Value, ...)",
690 &IN_PATH_REST,
691 &OUT_DATASET
692);
693one_sig_descriptor!(
694 DATA_EXISTS_DESCRIPTOR,
695 DATA_EXISTS_SIGS,
696 "tf = data.exists(path)",
697 &IN_PATH_REST,
698 &OUT_BOOL
699);
700one_sig_descriptor!(
701 DATA_DELETE_DESCRIPTOR,
702 DATA_DELETE_SIGS,
703 "tf = data.delete(path, Name, Value, ...)",
704 &IN_PATH_REST,
705 &OUT_BOOL
706);
707one_sig_descriptor!(
708 DATA_COPY_DESCRIPTOR,
709 DATA_COPY_SIGS,
710 "tf = data.copy(fromPath, toPath, Name, Value, ...)",
711 &IN_FROM_TO_REST,
712 &OUT_BOOL
713);
714one_sig_descriptor!(
715 DATA_MOVE_DESCRIPTOR,
716 DATA_MOVE_SIGS,
717 "tf = data.move(fromPath, toPath, Name, Value, ...)",
718 &IN_FROM_TO_REST,
719 &OUT_BOOL
720);
721one_sig_descriptor!(
722 DATA_IMPORT_DESCRIPTOR,
723 DATA_IMPORT_SIGS,
724 "ds = data.import(path, format, sourcePath, Name, Value, ...)",
725 &IN_PATH_FORMAT_TARGET_REST,
726 &OUT_DATASET
727);
728one_sig_descriptor!(
729 DATA_EXPORT_DESCRIPTOR,
730 DATA_EXPORT_SIGS,
731 "tf = data.export(path, format, targetPath, Name, Value, ...)",
732 &IN_PATH_FORMAT_TARGET_REST,
733 &OUT_BOOL
734);
735one_sig_descriptor!(
736 DATA_LIST_DESCRIPTOR,
737 DATA_LIST_SIGS,
738 "C = data.list(prefix, Name, Value, ...)",
739 &IN_PREFIX_REST,
740 &OUT_CELL
741);
742one_sig_descriptor!(
743 DATA_INSPECT_DESCRIPTOR,
744 DATA_INSPECT_SIGS,
745 "S = data.inspect(path)",
746 &IN_PATH_REST,
747 &OUT_STRUCT
748);
749
750one_sig_descriptor!(
751 DATASET_PATH_DESCRIPTOR,
752 DATASET_PATH_SIGS,
753 "path = Dataset.path(ds)",
754 &IN_BASE,
755 &OUT_STRING
756);
757one_sig_descriptor!(
758 DATASET_ID_DESCRIPTOR,
759 DATASET_ID_SIGS,
760 "id = Dataset.id(ds)",
761 &IN_BASE,
762 &OUT_STRING
763);
764one_sig_descriptor!(
765 DATASET_VERSION_DESCRIPTOR,
766 DATASET_VERSION_SIGS,
767 "version = Dataset.version(ds)",
768 &IN_BASE,
769 &OUT_STRING
770);
771one_sig_descriptor!(
772 DATASET_ARRAYS_DESCRIPTOR,
773 DATASET_ARRAYS_SIGS,
774 "C = Dataset.arrays(ds)",
775 &IN_BASE,
776 &OUT_CELL
777);
778one_sig_descriptor!(
779 DATASET_HAS_ARRAY_DESCRIPTOR,
780 DATASET_HAS_ARRAY_SIGS,
781 "tf = Dataset.has_array(ds, name)",
782 &IN_BASE_NAME,
783 &OUT_BOOL
784);
785one_sig_descriptor!(
786 DATASET_ARRAY_DESCRIPTOR,
787 DATASET_ARRAY_SIGS,
788 "arr = Dataset.array(ds, name)",
789 &IN_BASE_NAME,
790 &OUT_ARRAY
791);
792one_sig_descriptor!(
793 DATASET_ATTRS_DESCRIPTOR,
794 DATASET_ATTRS_SIGS,
795 "S = Dataset.attrs(ds)",
796 &IN_BASE,
797 &OUT_STRUCT
798);
799one_sig_descriptor!(
800 DATASET_GET_ATTR_DESCRIPTOR,
801 DATASET_GET_ATTR_SIGS,
802 "value = Dataset.get_attr(ds, key, defaultValue)",
803 &IN_BASE_KEY_DEFAULT,
804 &OUT_VALUE
805);
806one_sig_descriptor!(
807 DATASET_SET_ATTR_DESCRIPTOR,
808 DATASET_SET_ATTR_SIGS,
809 "tf = Dataset.set_attr(ds, key, value)",
810 &IN_BASE_KEY_VALUE,
811 &OUT_BOOL
812);
813one_sig_descriptor!(
814 DATASET_SET_ATTRS_DESCRIPTOR,
815 DATASET_SET_ATTRS_SIGS,
816 "tf = Dataset.set_attrs(ds, attrs)",
817 &IN_BASE_ATTRS,
818 &OUT_BOOL
819);
820one_sig_descriptor!(
821 DATASET_BEGIN_DESCRIPTOR,
822 DATASET_BEGIN_SIGS,
823 "tx = Dataset.begin(ds, Name, Value, ...)",
824 &IN_BASE_REST,
825 &OUT_TX
826);
827one_sig_descriptor!(
828 DATASET_SNAPSHOT_DESCRIPTOR,
829 DATASET_SNAPSHOT_SIGS,
830 "snapshotPath = Dataset.snapshot(ds, label, Name, Value, ...)",
831 &IN_BASE_LABEL_REST,
832 &OUT_STRING
833);
834one_sig_descriptor!(
835 DATASET_REFRESH_DESCRIPTOR,
836 DATASET_REFRESH_SIGS,
837 "ds = Dataset.refresh(ds)",
838 &IN_BASE,
839 &OUT_DATASET
840);
841
842one_sig_descriptor!(
843 DATAARRAY_NAME_DESCRIPTOR,
844 DATAARRAY_NAME_SIGS,
845 "name = DataArray.name(arr)",
846 &IN_BASE,
847 &OUT_STRING
848);
849one_sig_descriptor!(
850 DATAARRAY_DTYPE_DESCRIPTOR,
851 DATAARRAY_DTYPE_SIGS,
852 "dtype = DataArray.dtype(arr)",
853 &IN_BASE,
854 &OUT_STRING
855);
856one_sig_descriptor!(
857 DATAARRAY_SHAPE_DESCRIPTOR,
858 DATAARRAY_SHAPE_SIGS,
859 "shape = DataArray.shape(arr)",
860 &IN_BASE,
861 &OUT_TENSOR
862);
863one_sig_descriptor!(
864 DATAARRAY_RANK_DESCRIPTOR,
865 DATAARRAY_RANK_SIGS,
866 "rank = DataArray.rank(arr)",
867 &IN_BASE,
868 &OUT_VALUE
869);
870one_sig_descriptor!(
871 DATAARRAY_CHUNK_SHAPE_DESCRIPTOR,
872 DATAARRAY_CHUNK_SHAPE_SIGS,
873 "chunkShape = DataArray.chunk_shape(arr)",
874 &IN_BASE,
875 &OUT_TENSOR
876);
877one_sig_descriptor!(
878 DATAARRAY_CODEC_DESCRIPTOR,
879 DATAARRAY_CODEC_SIGS,
880 "codec = DataArray.codec(arr)",
881 &IN_BASE,
882 &OUT_STRING
883);
884one_sig_descriptor!(
885 DATAARRAY_READ_DESCRIPTOR,
886 DATAARRAY_READ_SIGS,
887 "X = DataArray.read(arr, sliceSpec)",
888 &IN_BASE_SLICE_OPTIONAL,
889 &OUT_TENSOR
890);
891const DATAARRAY_WRITE_SIGS: [BuiltinSignatureDescriptor; 2] = [
892 BuiltinSignatureDescriptor {
893 label: "tf = DataArray.write(arr, values)",
894 inputs: &IN_BASE_VALUES,
895 outputs: &OUT_BOOL,
896 },
897 BuiltinSignatureDescriptor {
898 label: "tf = DataArray.write(arr, sliceSpec, values)",
899 inputs: &IN_BASE_SLICE_VALUES,
900 outputs: &OUT_BOOL,
901 },
902];
903pub const DATAARRAY_WRITE_DESCRIPTOR: BuiltinDescriptor = BuiltinDescriptor {
904 signatures: &DATAARRAY_WRITE_SIGS,
905 output_mode: BuiltinOutputMode::Fixed,
906 completion_policy: BuiltinCompletionPolicy::Public,
907 errors: &DATA_DESCRIPTOR_ERRORS,
908};
909one_sig_descriptor!(
910 DATAARRAY_RESIZE_DESCRIPTOR,
911 DATAARRAY_RESIZE_SIGS,
912 "tf = DataArray.resize(arr, newShape, Name, Value, ...)",
913 &IN_BASE_NEW_SHAPE_REST,
914 &OUT_BOOL
915);
916one_sig_descriptor!(
917 DATAARRAY_FILL_DESCRIPTOR,
918 DATAARRAY_FILL_SIGS,
919 "tf = DataArray.fill(arr, value, Name, Value, ...)",
920 &IN_BASE_VALUE_REST,
921 &OUT_BOOL
922);
923
924one_sig_descriptor!(
925 DATATX_ID_DESCRIPTOR,
926 DATATX_ID_SIGS,
927 "id = DataTransaction.id(tx)",
928 &IN_BASE,
929 &OUT_STRING
930);
931one_sig_descriptor!(
932 DATATX_WRITE_DESCRIPTOR,
933 DATATX_WRITE_SIGS_1,
934 "tf = DataTransaction.write(tx, arrayName, sliceSpec, values, Name, Value, ...)",
935 &IN_TX_WRITE,
936 &OUT_BOOL
937);
938one_sig_descriptor!(
939 DATATX_SET_ATTR_DESCRIPTOR,
940 DATATX_SET_ATTR_SIGS,
941 "tf = DataTransaction.set_attr(tx, key, value)",
942 &IN_BASE_KEY_VALUE,
943 &OUT_BOOL
944);
945one_sig_descriptor!(
946 DATATX_SET_ATTRS_DESCRIPTOR,
947 DATATX_SET_ATTRS_SIGS,
948 "tf = DataTransaction.set_attrs(tx, attrs)",
949 &IN_BASE_ATTRS,
950 &OUT_BOOL
951);
952one_sig_descriptor!(
953 DATATX_RESIZE_DESCRIPTOR,
954 DATATX_RESIZE_SIGS,
955 "tf = DataTransaction.resize(tx, arrayName, newShape, Name, Value, ...)",
956 &IN_TX_ARRAY_SHAPE_REST,
957 &OUT_BOOL
958);
959one_sig_descriptor!(
960 DATATX_FILL_DESCRIPTOR,
961 DATATX_FILL_SIGS,
962 "tf = DataTransaction.fill(tx, arrayName, value, sliceSpec)",
963 &IN_TX_ARRAY_VALUE_SLICE_OPT,
964 &OUT_BOOL
965);
966one_sig_descriptor!(
967 DATATX_DELETE_ARRAY_DESCRIPTOR,
968 DATATX_DELETE_ARRAY_SIGS,
969 "tf = DataTransaction.delete_array(tx, arrayName)",
970 &IN_TX_ARRAY_NAME,
971 &OUT_BOOL
972);
973one_sig_descriptor!(
974 DATATX_CREATE_ARRAY_DESCRIPTOR,
975 DATATX_CREATE_ARRAY_SIGS,
976 "tf = DataTransaction.create_array(tx, arrayName, meta)",
977 &IN_TX_ARRAY_META,
978 &OUT_BOOL
979);
980one_sig_descriptor!(
981 DATATX_COMMIT_DESCRIPTOR,
982 DATATX_COMMIT_SIGS,
983 "tf = DataTransaction.commit(tx, Name, Value, ...)",
984 &IN_TX_COMMIT_REST,
985 &OUT_BOOL
986);
987one_sig_descriptor!(
988 COMMIT_ALIAS_DESCRIPTOR,
989 COMMIT_ALIAS_SIGS,
990 "tf = commit(tx, Name, Value, ...)",
991 &IN_TX_COMMIT_REST,
992 &OUT_BOOL
993);
994one_sig_descriptor!(
995 DATATX_ABORT_DESCRIPTOR,
996 DATATX_ABORT_SIGS,
997 "tf = DataTransaction.abort(tx)",
998 &IN_BASE,
999 &OUT_BOOL
1000);
1001one_sig_descriptor!(
1002 DATATX_STATUS_DESCRIPTOR,
1003 DATATX_STATUS_SIGS,
1004 "status = DataTransaction.status(tx)",
1005 &IN_BASE,
1006 &OUT_STRING
1007);
1008
1009#[runtime_builtin(
1010 name = "data.create",
1011 category = "io/data",
1012 summary = "Create a typed dataset at a .data path.",
1013 keywords = "data,dataset,create,persistence",
1014 sink = true,
1015 type_resolver(crate::builtins::io::type_resolvers::data_dataset_type),
1016 descriptor(crate::builtins::io::data::DATA_CREATE_DESCRIPTOR),
1017 builtin_path = "crate::builtins::io::data"
1018)]
1019async fn data_create_builtin(
1020 path: Value,
1021 schema: Value,
1022 _rest: Vec<Value>,
1023) -> BuiltinResult<Value> {
1024 let path = parse_string(&path, "data.create path")?;
1025 let root = dataset_root(&path);
1026 let schema = parse_schema(&schema)?;
1027 let now = now_rfc3339();
1028 let mut arrays = BTreeMap::new();
1029 for (name, mut meta) in schema.arrays {
1030 let total = meta.shape.iter().copied().product::<usize>();
1031 let payload = DataArrayPayload {
1032 dtype: meta.dtype.clone(),
1033 shape: meta.shape.clone(),
1034 values: vec![0.0; total],
1035 };
1036 let (payload_path, chunk_index_path) =
1037 write_array_payload_async(&root, &name, &payload, &meta.chunk_shape).await?;
1038 meta.data_path = make_rel_data_path(&root, &payload_path)?;
1039 meta.chunk_index_path = Some(make_rel_data_path(&root, &chunk_index_path)?);
1040 arrays.insert(name, meta);
1041 }
1042
1043 let manifest = DataManifest {
1044 schema_version: 1,
1045 format: "runmat-data".to_string(),
1046 dataset_id: crate::data::new_dataset_id(),
1047 name: root.file_name().map(|v| v.to_string_lossy().to_string()),
1048 created_at: now.clone(),
1049 updated_at: now,
1050 arrays,
1051 attrs: BTreeMap::new(),
1052 txn_sequence: 0,
1053 };
1054 write_manifest_async(&root, &manifest).await?;
1055 Ok(dataset_object(&path, &manifest))
1056}
1057
1058#[runtime_builtin(
1059 name = "data.open",
1060 category = "io/data",
1061 summary = "Open a dataset handle from a .data path.",
1062 keywords = "data,dataset,open,persistence",
1063 type_resolver(crate::builtins::io::type_resolvers::data_dataset_type),
1064 descriptor(crate::builtins::io::data::DATA_OPEN_DESCRIPTOR),
1065 builtin_path = "crate::builtins::io::data"
1066)]
1067async fn data_open_builtin(path: Value, _rest: Vec<Value>) -> BuiltinResult<Value> {
1068 let path = parse_string(&path, "data.open path")?;
1069 let root = dataset_root(&path);
1070 let manifest = read_manifest_async(&root).await?;
1071 let mut ds = dataset_object(&path, &manifest);
1072 hydrate_dataset_descriptor_async(&path, &mut ds).await;
1073 Ok(ds)
1074}
1075
1076#[runtime_builtin(
1077 name = "data.exists",
1078 category = "io/data",
1079 summary = "Check if dataset exists.",
1080 keywords = "data,dataset,exists",
1081 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1082 descriptor(crate::builtins::io::data::DATA_EXISTS_DESCRIPTOR),
1083 builtin_path = "crate::builtins::io::data"
1084)]
1085async fn data_exists_builtin(path: Value) -> BuiltinResult<Value> {
1086 let path = parse_string(&path, "data.exists path")?;
1087 let root = dataset_root(&path);
1088 let exists = runmat_filesystem::metadata_async(manifest_path(&root))
1089 .await
1090 .is_ok();
1091 Ok(Value::Bool(exists))
1092}
1093
1094#[runtime_builtin(
1095 name = "data.delete",
1096 category = "io/data",
1097 summary = "Delete a dataset path.",
1098 keywords = "data,dataset,delete",
1099 sink = true,
1100 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1101 descriptor(crate::builtins::io::data::DATA_DELETE_DESCRIPTOR),
1102 builtin_path = "crate::builtins::io::data"
1103)]
1104async fn data_delete_builtin(path: Value, _rest: Vec<Value>) -> BuiltinResult<Value> {
1105 let path = parse_string(&path, "data.delete path")?;
1106 let root = dataset_root(&path);
1107 runmat_filesystem::remove_dir_all_async(&root)
1108 .await
1109 .map_err(|err| {
1110 data_error(format!(
1111 "data.delete: failed to remove '{}': {err}",
1112 root.display()
1113 ))
1114 })?;
1115 Ok(Value::Bool(true))
1116}
1117
1118#[runtime_builtin(
1119 name = "data.copy",
1120 category = "io/data",
1121 summary = "Copy dataset to new path.",
1122 keywords = "data,dataset,copy",
1123 sink = true,
1124 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1125 descriptor(crate::builtins::io::data::DATA_COPY_DESCRIPTOR),
1126 builtin_path = "crate::builtins::io::data"
1127)]
1128async fn data_copy_builtin(
1129 from_path: Value,
1130 to_path: Value,
1131 _rest: Vec<Value>,
1132) -> BuiltinResult<Value> {
1133 let from = parse_string(&from_path, "data.copy fromPath")?;
1134 let to = parse_string(&to_path, "data.copy toPath")?;
1135 copy_dir_recursive(&dataset_root(&from), &dataset_root(&to)).await?;
1136 Ok(Value::Bool(true))
1137}
1138
1139#[runtime_builtin(
1140 name = "data.move",
1141 category = "io/data",
1142 summary = "Move dataset to new path.",
1143 keywords = "data,dataset,move",
1144 sink = true,
1145 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1146 descriptor(crate::builtins::io::data::DATA_MOVE_DESCRIPTOR),
1147 builtin_path = "crate::builtins::io::data"
1148)]
1149async fn data_move_builtin(
1150 from_path: Value,
1151 to_path: Value,
1152 _rest: Vec<Value>,
1153) -> BuiltinResult<Value> {
1154 let from = parse_string(&from_path, "data.move fromPath")?;
1155 let to = parse_string(&to_path, "data.move toPath")?;
1156 runmat_filesystem::rename_async(dataset_root(&from), dataset_root(&to))
1157 .await
1158 .map_err(|err| {
1159 data_error(format!(
1160 "data.move: failed to move dataset '{from}' -> '{to}': {err}"
1161 ))
1162 })?;
1163 Ok(Value::Bool(true))
1164}
1165
1166#[runtime_builtin(
1167 name = "data.import",
1168 category = "io/data",
1169 summary = "Import an existing dataset file path.",
1170 keywords = "data,dataset,import",
1171 sink = true,
1172 type_resolver(crate::builtins::io::type_resolvers::data_dataset_type),
1173 descriptor(crate::builtins::io::data::DATA_IMPORT_DESCRIPTOR),
1174 builtin_path = "crate::builtins::io::data"
1175)]
1176async fn data_import_builtin(
1177 path: Value,
1178 format: Value,
1179 source_path: Value,
1180 _rest: Vec<Value>,
1181) -> BuiltinResult<Value> {
1182 let path = parse_string(&path, "data.import path")?;
1183 let format = parse_string(&format, "data.import format")?;
1184 if !format.eq_ignore_ascii_case("data") {
1185 return Err(data_error(
1186 "data.import currently supports only format='data'",
1187 ));
1188 }
1189 let source_path = parse_string(&source_path, "data.import sourcePath")?;
1190 copy_dir_recursive(&dataset_root(&source_path), &dataset_root(&path)).await?;
1191 let manifest = read_manifest_async(&dataset_root(&path)).await?;
1192 Ok(dataset_object(&path, &manifest))
1193}
1194
1195#[runtime_builtin(
1196 name = "data.export",
1197 category = "io/data",
1198 summary = "Export dataset to target path.",
1199 keywords = "data,dataset,export",
1200 sink = true,
1201 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1202 descriptor(crate::builtins::io::data::DATA_EXPORT_DESCRIPTOR),
1203 builtin_path = "crate::builtins::io::data"
1204)]
1205async fn data_export_builtin(
1206 path: Value,
1207 format: Value,
1208 target_path: Value,
1209 _rest: Vec<Value>,
1210) -> BuiltinResult<Value> {
1211 let path = parse_string(&path, "data.export path")?;
1212 let format = parse_string(&format, "data.export format")?;
1213 if !format.eq_ignore_ascii_case("data") {
1214 return Err(data_error(
1215 "data.export currently supports only format='data'",
1216 ));
1217 }
1218 let target_path = parse_string(&target_path, "data.export targetPath")?;
1219 copy_dir_recursive(&dataset_root(&path), &dataset_root(&target_path)).await?;
1220 Ok(Value::Bool(true))
1221}
1222
1223#[runtime_builtin(
1224 name = "data.list",
1225 category = "io/data",
1226 summary = "List dataset paths under a prefix.",
1227 keywords = "data,dataset,list",
1228 type_resolver(crate::builtins::io::type_resolvers::data_cell_string_type),
1229 descriptor(crate::builtins::io::data::DATA_LIST_DESCRIPTOR),
1230 builtin_path = "crate::builtins::io::data"
1231)]
1232async fn data_list_builtin(path_prefix: Value, _rest: Vec<Value>) -> BuiltinResult<Value> {
1233 let prefix = parse_string(&path_prefix, "data.list prefix")?;
1234 let root = PathBuf::from(prefix);
1235 let entries = runmat_filesystem::read_dir_async(&root)
1236 .await
1237 .map_err(|err| {
1238 data_error(format!(
1239 "data.list: failed to read '{}': {err}",
1240 root.display()
1241 ))
1242 })?;
1243 let mut values = Vec::new();
1244 for entry in entries {
1245 if !entry.is_dir() {
1246 continue;
1247 }
1248 let candidate = entry.path();
1249 if candidate.extension().and_then(|s| s.to_str()) != Some("data") {
1250 continue;
1251 }
1252 if runmat_filesystem::metadata_async(candidate.join("manifest.json"))
1253 .await
1254 .is_ok()
1255 {
1256 values.push(Value::String(candidate.to_string_lossy().to_string()));
1257 }
1258 }
1259 let cols = values.len();
1260 make_cell(values, 1, cols).map_err(data_error)
1261}
1262
1263#[runtime_builtin(
1264 name = "data.inspect",
1265 category = "io/data",
1266 summary = "Inspect dataset metadata and schema fields.",
1267 keywords = "data,dataset,inspect,schema",
1268 type_resolver(crate::builtins::io::type_resolvers::data_struct_type),
1269 descriptor(crate::builtins::io::data::DATA_INSPECT_DESCRIPTOR),
1270 builtin_path = "crate::builtins::io::data"
1271)]
1272async fn data_inspect_builtin(path: Value) -> BuiltinResult<Value> {
1273 let path = parse_string(&path, "data.inspect path")?;
1274 let root = dataset_root(&path);
1275 let manifest = read_manifest_async(&root).await?;
1276 let mut out = StructValue::new();
1277 out.fields.insert("path".to_string(), Value::String(path));
1278 out.fields
1279 .insert("id".to_string(), Value::String(manifest.dataset_id));
1280 out.fields.insert(
1281 "arrayCount".to_string(),
1282 Value::Num(manifest.arrays.len() as f64),
1283 );
1284 out.fields
1285 .insert("updatedAt".to_string(), Value::String(manifest.updated_at));
1286 Ok(Value::Struct(out))
1287}
1288
1289#[runtime_builtin(
1290 name = "Dataset.path",
1291 category = "io/data",
1292 summary = "Return dataset path.",
1293 keywords = "dataset,path",
1294 type_resolver(crate::builtins::io::type_resolvers::data_string_type),
1295 descriptor(crate::builtins::io::data::DATASET_PATH_DESCRIPTOR),
1296 builtin_path = "crate::builtins::io::data"
1297)]
1298async fn dataset_path_builtin(base: Value) -> BuiltinResult<Value> {
1299 let obj = as_object(&base, "Dataset.path")?;
1300 Ok(get_object_prop(obj, "__data_path")?.clone())
1301}
1302
1303#[runtime_builtin(
1304 name = "Dataset.id",
1305 category = "io/data",
1306 type_resolver(crate::builtins::io::type_resolvers::data_string_type),
1307 descriptor(crate::builtins::io::data::DATASET_ID_DESCRIPTOR),
1308 builtin_path = "crate::builtins::io::data"
1309)]
1310async fn dataset_id_builtin(base: Value) -> BuiltinResult<Value> {
1311 let obj = as_object(&base, "Dataset.id")?;
1312 Ok(get_object_prop(obj, "__data_id")?.clone())
1313}
1314
1315#[runtime_builtin(
1316 name = "Dataset.version",
1317 category = "io/data",
1318 type_resolver(crate::builtins::io::type_resolvers::data_string_type),
1319 descriptor(crate::builtins::io::data::DATASET_VERSION_DESCRIPTOR),
1320 builtin_path = "crate::builtins::io::data"
1321)]
1322async fn dataset_version_builtin(base: Value) -> BuiltinResult<Value> {
1323 let obj = as_object(&base, "Dataset.version")?;
1324 Ok(get_object_prop(obj, "__data_version")?.clone())
1325}
1326
1327#[runtime_builtin(
1328 name = "Dataset.arrays",
1329 category = "io/data",
1330 type_resolver(crate::builtins::io::type_resolvers::data_cell_string_type),
1331 descriptor(crate::builtins::io::data::DATASET_ARRAYS_DESCRIPTOR),
1332 builtin_path = "crate::builtins::io::data"
1333)]
1334async fn dataset_arrays_builtin(base: Value) -> BuiltinResult<Value> {
1335 let path = dataset_path_from_object(&base, "Dataset.arrays")?;
1336 let manifest = read_manifest_async(&dataset_root(&path)).await?;
1337 let values: Vec<Value> = manifest
1338 .arrays
1339 .keys()
1340 .map(|k| Value::String(k.clone()))
1341 .collect();
1342 make_cell(values.clone(), 1, values.len()).map_err(data_error)
1343}
1344
1345#[runtime_builtin(
1346 name = "Dataset.has_array",
1347 category = "io/data",
1348 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1349 descriptor(crate::builtins::io::data::DATASET_HAS_ARRAY_DESCRIPTOR),
1350 builtin_path = "crate::builtins::io::data"
1351)]
1352async fn dataset_has_array_builtin(base: Value, name: Value) -> BuiltinResult<Value> {
1353 let path = dataset_path_from_object(&base, "Dataset.has_array")?;
1354 let name = parse_string(&name, "Dataset.has_array name")?;
1355 let manifest = read_manifest_async(&dataset_root(&path)).await?;
1356 Ok(Value::Bool(manifest.arrays.contains_key(&name)))
1357}
1358
1359#[runtime_builtin(
1360 name = "Dataset.array",
1361 category = "io/data",
1362 type_resolver(crate::builtins::io::type_resolvers::data_array_type),
1363 descriptor(crate::builtins::io::data::DATASET_ARRAY_DESCRIPTOR),
1364 builtin_path = "crate::builtins::io::data"
1365)]
1366async fn dataset_array_builtin(base: Value, name: Value) -> BuiltinResult<Value> {
1367 let path = dataset_path_from_object(&base, "Dataset.array")?;
1368 let name = parse_string(&name, "Dataset.array name")?;
1369 let manifest = read_manifest_async(&dataset_root(&path)).await?;
1370 if !manifest.arrays.contains_key(&name) {
1371 return Err(data_error(format!(
1372 "Dataset.array: array '{name}' not found"
1373 )));
1374 }
1375 Ok(array_object(&path, &name))
1376}
1377
1378#[runtime_builtin(
1379 name = "Dataset.attrs",
1380 category = "io/data",
1381 type_resolver(crate::builtins::io::type_resolvers::data_struct_type),
1382 descriptor(crate::builtins::io::data::DATASET_ATTRS_DESCRIPTOR),
1383 builtin_path = "crate::builtins::io::data"
1384)]
1385async fn dataset_attrs_builtin(base: Value) -> BuiltinResult<Value> {
1386 let path = dataset_path_from_object(&base, "Dataset.attrs")?;
1387 let manifest = read_manifest_async(&dataset_root(&path)).await?;
1388 Ok(attrs_to_struct(&manifest.attrs))
1389}
1390
1391#[runtime_builtin(
1392 name = "Dataset.get_attr",
1393 category = "io/data",
1394 type_resolver(crate::builtins::io::type_resolvers::data_unknown_type),
1395 descriptor(crate::builtins::io::data::DATASET_GET_ATTR_DESCRIPTOR),
1396 builtin_path = "crate::builtins::io::data"
1397)]
1398async fn dataset_get_attr_builtin(
1399 base: Value,
1400 key: Value,
1401 rest: Vec<Value>,
1402) -> BuiltinResult<Value> {
1403 let path = dataset_path_from_object(&base, "Dataset.get_attr")?;
1404 let key = parse_string(&key, "Dataset.get_attr key")?;
1405 let manifest = read_manifest_async(&dataset_root(&path)).await?;
1406 if let Some(value) = manifest.attrs.get(&key) {
1407 return Ok(json_to_value(value));
1408 }
1409 Ok(rest.first().cloned().unwrap_or(Value::Num(0.0)))
1410}
1411
1412#[runtime_builtin(
1413 name = "Dataset.set_attr",
1414 category = "io/data",
1415 sink = true,
1416 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1417 descriptor(crate::builtins::io::data::DATASET_SET_ATTR_DESCRIPTOR),
1418 builtin_path = "crate::builtins::io::data"
1419)]
1420async fn dataset_set_attr_builtin(base: Value, key: Value, value: Value) -> BuiltinResult<Value> {
1421 let path = dataset_path_from_object(&base, "Dataset.set_attr")?;
1422 let key = parse_string(&key, "Dataset.set_attr key")?;
1423 let root = dataset_root(&path);
1424 let mut manifest = read_manifest_async(&root).await?;
1425 manifest.attrs.insert(key, value_to_json(&value));
1426 manifest.updated_at = now_rfc3339();
1427 manifest.txn_sequence = manifest.txn_sequence.saturating_add(1);
1428 write_manifest_async(&root, &manifest).await?;
1429 Ok(Value::Bool(true))
1430}
1431
1432#[runtime_builtin(
1433 name = "Dataset.set_attrs",
1434 category = "io/data",
1435 sink = true,
1436 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1437 descriptor(crate::builtins::io::data::DATASET_SET_ATTRS_DESCRIPTOR),
1438 builtin_path = "crate::builtins::io::data"
1439)]
1440async fn dataset_set_attrs_builtin(base: Value, attrs: Value) -> BuiltinResult<Value> {
1441 let path = dataset_path_from_object(&base, "Dataset.set_attrs")?;
1442 let Value::Struct(incoming) = attrs else {
1443 return Err(data_error("Dataset.set_attrs: attrs must be a struct"));
1444 };
1445 let root = dataset_root(&path);
1446 let mut manifest = read_manifest_async(&root).await?;
1447 for (k, v) in incoming.fields {
1448 manifest.attrs.insert(k, value_to_json(&v));
1449 }
1450 manifest.updated_at = now_rfc3339();
1451 manifest.txn_sequence = manifest.txn_sequence.saturating_add(1);
1452 write_manifest_async(&root, &manifest).await?;
1453 Ok(Value::Bool(true))
1454}
1455
1456#[runtime_builtin(
1457 name = "Dataset.begin",
1458 category = "io/data",
1459 type_resolver(crate::builtins::io::type_resolvers::data_tx_type),
1460 descriptor(crate::builtins::io::data::DATASET_BEGIN_DESCRIPTOR),
1461 builtin_path = "crate::builtins::io::data"
1462)]
1463async fn dataset_begin_builtin(base: Value, _rest: Vec<Value>) -> BuiltinResult<Value> {
1464 let path = dataset_path_from_object(&base, "Dataset.begin")?;
1465 let manifest = read_manifest_async(&dataset_root(&path)).await?;
1466 let tx_id = start_tx(path.clone(), manifest.txn_sequence);
1467 tracing::info!(
1468 target: "runmat.data",
1469 dataset = path,
1470 tx_id = tx_id,
1471 base_sequence = manifest.txn_sequence,
1472 "data transaction begin"
1473 );
1474 Ok(transaction_object(&path, &tx_id))
1475}
1476
1477#[runtime_builtin(
1478 name = "Dataset.snapshot",
1479 category = "io/data",
1480 type_resolver(crate::builtins::io::type_resolvers::data_string_type),
1481 descriptor(crate::builtins::io::data::DATASET_SNAPSHOT_DESCRIPTOR),
1482 builtin_path = "crate::builtins::io::data"
1483)]
1484async fn dataset_snapshot_builtin(
1485 base: Value,
1486 label: Value,
1487 _rest: Vec<Value>,
1488) -> BuiltinResult<Value> {
1489 let path = dataset_path_from_object(&base, "Dataset.snapshot")?;
1490 let label = parse_string(&label, "Dataset.snapshot label")?;
1491 let root = dataset_root(&path);
1492 let snapshots = root.join(".snapshots");
1493 runmat_filesystem::create_dir_all_async(&snapshots)
1494 .await
1495 .map_err(|err| {
1496 data_error(format!(
1497 "Dataset.snapshot: failed to create snapshots dir: {err}"
1498 ))
1499 })?;
1500 let src = manifest_path(&root);
1501 let dst = snapshots.join(format!("{}.manifest.json", sanitize_label(&label)));
1502 copy_file(&src, &dst).await?;
1503 Ok(Value::String(dst.to_string_lossy().to_string()))
1504}
1505
1506#[runtime_builtin(
1507 name = "Dataset.refresh",
1508 category = "io/data",
1509 type_resolver(crate::builtins::io::type_resolvers::data_dataset_type),
1510 descriptor(crate::builtins::io::data::DATASET_REFRESH_DESCRIPTOR),
1511 builtin_path = "crate::builtins::io::data"
1512)]
1513async fn dataset_refresh_builtin(base: Value) -> BuiltinResult<Value> {
1514 let path = dataset_path_from_object(&base, "Dataset.refresh")?;
1515 let manifest = read_manifest_async(&dataset_root(&path)).await?;
1516 let mut ds = dataset_object(&path, &manifest);
1517 hydrate_dataset_descriptor_async(&path, &mut ds).await;
1518 Ok(ds)
1519}
1520
1521#[runtime_builtin(
1522 name = "DataArray.name",
1523 category = "io/data",
1524 type_resolver(crate::builtins::io::type_resolvers::data_string_type),
1525 descriptor(crate::builtins::io::data::DATAARRAY_NAME_DESCRIPTOR),
1526 builtin_path = "crate::builtins::io::data"
1527)]
1528async fn data_array_name_builtin(base: Value) -> BuiltinResult<Value> {
1529 let obj = as_object(&base, "DataArray.name")?;
1530 Ok(get_object_prop(obj, "__array_name")?.clone())
1531}
1532
1533#[runtime_builtin(
1534 name = "DataArray.dtype",
1535 category = "io/data",
1536 type_resolver(crate::builtins::io::type_resolvers::data_string_type),
1537 descriptor(crate::builtins::io::data::DATAARRAY_DTYPE_DESCRIPTOR),
1538 builtin_path = "crate::builtins::io::data"
1539)]
1540async fn data_array_dtype_builtin(base: Value) -> BuiltinResult<Value> {
1541 let (path, name) = array_identity(&base, "DataArray.dtype")?;
1542 let manifest = read_manifest_async(&dataset_root(&path)).await?;
1543 let meta = manifest
1544 .arrays
1545 .get(&name)
1546 .ok_or_else(|| data_error(format!("DataArray.dtype: array '{name}' not found")))?;
1547 Ok(Value::String(meta.dtype.clone()))
1548}
1549
1550#[runtime_builtin(
1551 name = "DataArray.shape",
1552 category = "io/data",
1553 type_resolver(crate::builtins::io::type_resolvers::data_shape_tensor_type),
1554 descriptor(crate::builtins::io::data::DATAARRAY_SHAPE_DESCRIPTOR),
1555 builtin_path = "crate::builtins::io::data"
1556)]
1557async fn data_array_shape_builtin(base: Value) -> BuiltinResult<Value> {
1558 let (path, name) = array_identity(&base, "DataArray.shape")?;
1559 let manifest = read_manifest_async(&dataset_root(&path)).await?;
1560 let meta = manifest
1561 .arrays
1562 .get(&name)
1563 .ok_or_else(|| data_error(format!("DataArray.shape: array '{name}' not found")))?;
1564 let values = meta.shape.iter().map(|v| *v as f64).collect::<Vec<_>>();
1565 let tensor = Tensor::new(values, vec![1, meta.shape.len()])
1566 .map_err(|err| data_error(format!("DataArray.shape: {err}")))?;
1567 Ok(Value::Tensor(tensor))
1568}
1569
1570#[runtime_builtin(
1571 name = "DataArray.rank",
1572 category = "io/data",
1573 type_resolver(crate::builtins::io::type_resolvers::data_int_type),
1574 descriptor(crate::builtins::io::data::DATAARRAY_RANK_DESCRIPTOR),
1575 builtin_path = "crate::builtins::io::data"
1576)]
1577async fn data_array_rank_builtin(base: Value) -> BuiltinResult<Value> {
1578 let (path, name) = array_identity(&base, "DataArray.rank")?;
1579 let manifest = read_manifest_async(&dataset_root(&path)).await?;
1580 let meta = manifest
1581 .arrays
1582 .get(&name)
1583 .ok_or_else(|| data_error(format!("DataArray.rank: array '{name}' not found")))?;
1584 Ok(Value::Num(meta.shape.len() as f64))
1585}
1586
1587#[runtime_builtin(
1588 name = "DataArray.chunk_shape",
1589 category = "io/data",
1590 type_resolver(crate::builtins::io::type_resolvers::data_shape_tensor_type),
1591 descriptor(crate::builtins::io::data::DATAARRAY_CHUNK_SHAPE_DESCRIPTOR),
1592 builtin_path = "crate::builtins::io::data"
1593)]
1594async fn data_array_chunk_shape_builtin(base: Value) -> BuiltinResult<Value> {
1595 let (path, name) = array_identity(&base, "DataArray.chunk_shape")?;
1596 let manifest = read_manifest_async(&dataset_root(&path)).await?;
1597 let meta = manifest
1598 .arrays
1599 .get(&name)
1600 .ok_or_else(|| data_error(format!("DataArray.chunk_shape: array '{name}' not found")))?;
1601 let values = meta
1602 .chunk_shape
1603 .iter()
1604 .map(|v| *v as f64)
1605 .collect::<Vec<_>>();
1606 let tensor = Tensor::new(values, vec![1, meta.chunk_shape.len()])
1607 .map_err(|err| data_error(format!("DataArray.chunk_shape: {err}")))?;
1608 Ok(Value::Tensor(tensor))
1609}
1610
1611#[runtime_builtin(
1612 name = "DataArray.codec",
1613 category = "io/data",
1614 type_resolver(crate::builtins::io::type_resolvers::data_string_type),
1615 descriptor(crate::builtins::io::data::DATAARRAY_CODEC_DESCRIPTOR),
1616 builtin_path = "crate::builtins::io::data"
1617)]
1618async fn data_array_codec_builtin(base: Value) -> BuiltinResult<Value> {
1619 let (path, name) = array_identity(&base, "DataArray.codec")?;
1620 let manifest = read_manifest_async(&dataset_root(&path)).await?;
1621 let meta = manifest
1622 .arrays
1623 .get(&name)
1624 .ok_or_else(|| data_error(format!("DataArray.codec: array '{name}' not found")))?;
1625 Ok(Value::String(meta.codec.clone()))
1626}
1627
1628#[runtime_builtin(
1629 name = "DataArray.read",
1630 category = "io/data",
1631 type_resolver(crate::builtins::io::type_resolvers::data_tensor_type),
1632 descriptor(crate::builtins::io::data::DATAARRAY_READ_DESCRIPTOR),
1633 builtin_path = "crate::builtins::io::data"
1634)]
1635async fn data_array_read_builtin(base: Value, rest: Vec<Value>) -> BuiltinResult<Value> {
1636 let (path, name) = array_identity(&base, "DataArray.read")?;
1637 let root = dataset_root(&path);
1638 let manifest = read_manifest_async(&root).await?;
1639 let meta = manifest
1640 .arrays
1641 .get(&name)
1642 .ok_or_else(|| data_error(format!("DataArray.read: array '{name}' not found")))?;
1643 let payload = read_array_payload_async(&root, meta).await?;
1644 let sliced = if let Some(slice_spec) = rest.first() {
1645 read_slice_payload(&payload, slice_spec)?
1646 } else {
1647 payload
1648 };
1649 let tensor = Tensor::new(sliced.values, sliced.shape)
1650 .map_err(|err| data_error(format!("DataArray.read: {err}")))?;
1651 Ok(Value::Tensor(tensor))
1652}
1653
1654#[runtime_builtin(
1655 name = "DataArray.write",
1656 category = "io/data",
1657 sink = true,
1658 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1659 descriptor(crate::builtins::io::data::DATAARRAY_WRITE_DESCRIPTOR),
1660 builtin_path = "crate::builtins::io::data"
1661)]
1662async fn data_array_write_builtin(base: Value, rest: Vec<Value>) -> BuiltinResult<Value> {
1663 let (path, name) = array_identity(&base, "DataArray.write")?;
1664 let (slice_spec, value) = match rest.as_slice() {
1665 [v] => (None, v),
1666 [slice, v] => (Some(slice), v),
1667 _ => {
1668 return Err(data_error(
1669 "DataArray.write expects values or (sliceSpec, values) arguments",
1670 ))
1671 }
1672 };
1673 write_array_full_async(&path, &name, slice_spec, value).await?;
1674 Ok(Value::Bool(true))
1675}
1676
1677#[runtime_builtin(
1678 name = "DataArray.resize",
1679 category = "io/data",
1680 sink = true,
1681 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1682 descriptor(crate::builtins::io::data::DATAARRAY_RESIZE_DESCRIPTOR),
1683 builtin_path = "crate::builtins::io::data"
1684)]
1685async fn data_array_resize_builtin(
1686 base: Value,
1687 new_shape: Value,
1688 _rest: Vec<Value>,
1689) -> BuiltinResult<Value> {
1690 let (path, name) = array_identity(&base, "DataArray.resize")?;
1691 let shape = parse_shape_from_value(&new_shape)?;
1692 let root = dataset_root(&path);
1693 let mut manifest = read_manifest_async(&root).await?;
1694 let meta = manifest
1695 .arrays
1696 .get_mut(&name)
1697 .ok_or_else(|| data_error(format!("DataArray.resize: array '{name}' not found")))?;
1698 meta.shape = shape.clone();
1699 let payload = DataArrayPayload {
1700 dtype: meta.dtype.clone(),
1701 shape: shape.clone(),
1702 values: vec![0.0; shape.iter().copied().product()],
1703 };
1704 let (payload_path, chunk_index_path) =
1705 write_array_payload_async(&root, &name, &payload, &meta.chunk_shape).await?;
1706 meta.data_path = make_rel_data_path(&root, &payload_path)?;
1707 meta.chunk_index_path = Some(make_rel_data_path(&root, &chunk_index_path)?);
1708 manifest.updated_at = now_rfc3339();
1709 manifest.txn_sequence = manifest.txn_sequence.saturating_add(1);
1710 write_manifest_async(&root, &manifest).await?;
1711 Ok(Value::Bool(true))
1712}
1713
1714#[runtime_builtin(
1715 name = "DataArray.fill",
1716 category = "io/data",
1717 sink = true,
1718 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1719 descriptor(crate::builtins::io::data::DATAARRAY_FILL_DESCRIPTOR),
1720 builtin_path = "crate::builtins::io::data"
1721)]
1722async fn data_array_fill_builtin(
1723 base: Value,
1724 value: Value,
1725 _rest: Vec<Value>,
1726) -> BuiltinResult<Value> {
1727 let (path, name) = array_identity(&base, "DataArray.fill")?;
1728 let root = dataset_root(&path);
1729 let mut manifest = read_manifest_async(&root).await?;
1730 let meta = manifest
1731 .arrays
1732 .get_mut(&name)
1733 .ok_or_else(|| data_error(format!("DataArray.fill: array '{name}' not found")))?;
1734 let scalar = scalar_to_f64(&value)?;
1735 let payload = DataArrayPayload {
1736 dtype: meta.dtype.clone(),
1737 shape: meta.shape.clone(),
1738 values: vec![scalar; meta.shape.iter().copied().product()],
1739 };
1740 let (payload_path, chunk_index_path) =
1741 write_array_payload_async(&root, &name, &payload, &meta.chunk_shape).await?;
1742 meta.data_path = make_rel_data_path(&root, &payload_path)?;
1743 meta.chunk_index_path = Some(make_rel_data_path(&root, &chunk_index_path)?);
1744 manifest.updated_at = now_rfc3339();
1745 manifest.txn_sequence = manifest.txn_sequence.saturating_add(1);
1746 write_manifest_async(&root, &manifest).await?;
1747 Ok(Value::Bool(true))
1748}
1749
1750#[runtime_builtin(
1751 name = "DataTransaction.id",
1752 category = "io/data",
1753 type_resolver(crate::builtins::io::type_resolvers::data_string_type),
1754 descriptor(crate::builtins::io::data::DATATX_ID_DESCRIPTOR),
1755 builtin_path = "crate::builtins::io::data"
1756)]
1757async fn data_tx_id_builtin(base: Value) -> BuiltinResult<Value> {
1758 let obj = as_object(&base, "DataTransaction.id")?;
1759 Ok(get_object_prop(obj, "__tx_id")?.clone())
1760}
1761
1762#[runtime_builtin(
1763 name = "DataTransaction.write",
1764 category = "io/data",
1765 sink = true,
1766 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1767 descriptor(crate::builtins::io::data::DATATX_WRITE_DESCRIPTOR),
1768 builtin_path = "crate::builtins::io::data"
1769)]
1770async fn data_tx_write_builtin(
1771 base: Value,
1772 array_name: Value,
1773 slice: Value,
1774 values: Value,
1775 _rest: Vec<Value>,
1776) -> BuiltinResult<Value> {
1777 let tx_id = tx_id_from_object(&base, "DataTransaction.write")?;
1778 let array_name = parse_string(&array_name, "DataTransaction.write arrayName")?;
1779 with_tx_mut(&tx_id, |tx| {
1780 if tx.status != TxnStatus::Open {
1781 return Err(data_error("DataTransaction.write: transaction is not open"));
1782 }
1783 tx.writes.push(PendingWrite {
1784 array: array_name,
1785 slice_spec: Some(slice),
1786 value: values,
1787 });
1788 Ok(())
1789 })?;
1790 Ok(Value::Bool(true))
1791}
1792
1793#[runtime_builtin(
1794 name = "DataTransaction.set_attr",
1795 category = "io/data",
1796 sink = true,
1797 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1798 descriptor(crate::builtins::io::data::DATATX_SET_ATTR_DESCRIPTOR),
1799 builtin_path = "crate::builtins::io::data"
1800)]
1801async fn data_tx_set_attr_builtin(base: Value, key: Value, value: Value) -> BuiltinResult<Value> {
1802 let tx_id = tx_id_from_object(&base, "DataTransaction.set_attr")?;
1803 let key = parse_string(&key, "DataTransaction.set_attr key")?;
1804 with_tx_mut(&tx_id, |tx| {
1805 if tx.status != TxnStatus::Open {
1806 return Err(data_error(
1807 "DataTransaction.set_attr: transaction is not open",
1808 ));
1809 }
1810 tx.attrs.insert(key, value);
1811 Ok(())
1812 })?;
1813 Ok(Value::Bool(true))
1814}
1815
1816#[runtime_builtin(
1817 name = "DataTransaction.set_attrs",
1818 category = "io/data",
1819 sink = true,
1820 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1821 descriptor(crate::builtins::io::data::DATATX_SET_ATTRS_DESCRIPTOR),
1822 builtin_path = "crate::builtins::io::data"
1823)]
1824async fn data_tx_set_attrs_builtin(base: Value, attrs: Value) -> BuiltinResult<Value> {
1825 let tx_id = tx_id_from_object(&base, "DataTransaction.set_attrs")?;
1826 let Value::Struct(incoming) = attrs else {
1827 return Err(data_error(
1828 "DataTransaction.set_attrs: attrs must be struct",
1829 ));
1830 };
1831 with_tx_mut(&tx_id, |tx| {
1832 if tx.status != TxnStatus::Open {
1833 return Err(data_error(
1834 "DataTransaction.set_attrs: transaction is not open",
1835 ));
1836 }
1837 for (k, v) in incoming.fields {
1838 tx.attrs.insert(k, v);
1839 }
1840 Ok(())
1841 })?;
1842 Ok(Value::Bool(true))
1843}
1844
1845#[runtime_builtin(
1846 name = "DataTransaction.resize",
1847 category = "io/data",
1848 sink = true,
1849 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1850 descriptor(crate::builtins::io::data::DATATX_RESIZE_DESCRIPTOR),
1851 builtin_path = "crate::builtins::io::data"
1852)]
1853async fn data_tx_resize_builtin(
1854 base: Value,
1855 array_name: Value,
1856 new_shape: Value,
1857 _rest: Vec<Value>,
1858) -> BuiltinResult<Value> {
1859 let tx_id = tx_id_from_object(&base, "DataTransaction.resize")?;
1860 let array_name = parse_string(&array_name, "DataTransaction.resize arrayName")?;
1861 let shape = parse_shape_from_value(&new_shape)?;
1862 with_tx_mut(&tx_id, |tx| {
1863 if tx.status != TxnStatus::Open {
1864 return Err(data_error(
1865 "DataTransaction.resize: transaction is not open",
1866 ));
1867 }
1868 tx.resizes.push(PendingResize {
1869 array: array_name,
1870 shape,
1871 });
1872 Ok(())
1873 })?;
1874 Ok(Value::Bool(true))
1875}
1876
1877#[runtime_builtin(
1878 name = "DataTransaction.fill",
1879 category = "io/data",
1880 sink = true,
1881 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1882 descriptor(crate::builtins::io::data::DATATX_FILL_DESCRIPTOR),
1883 builtin_path = "crate::builtins::io::data"
1884)]
1885async fn data_tx_fill_builtin(
1886 base: Value,
1887 array_name: Value,
1888 value: Value,
1889 rest: Vec<Value>,
1890) -> BuiltinResult<Value> {
1891 let tx_id = tx_id_from_object(&base, "DataTransaction.fill")?;
1892 let array_name = parse_string(&array_name, "DataTransaction.fill arrayName")?;
1893 let slice_spec = rest.first().cloned();
1894 with_tx_mut(&tx_id, |tx| {
1895 if tx.status != TxnStatus::Open {
1896 return Err(data_error("DataTransaction.fill: transaction is not open"));
1897 }
1898 tx.fills.push(PendingFill {
1899 array: array_name,
1900 slice_spec,
1901 value,
1902 });
1903 Ok(())
1904 })?;
1905 Ok(Value::Bool(true))
1906}
1907
1908#[runtime_builtin(
1909 name = "DataTransaction.delete_array",
1910 category = "io/data",
1911 sink = true,
1912 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1913 descriptor(crate::builtins::io::data::DATATX_DELETE_ARRAY_DESCRIPTOR),
1914 builtin_path = "crate::builtins::io::data"
1915)]
1916async fn data_tx_delete_array_builtin(base: Value, array_name: Value) -> BuiltinResult<Value> {
1917 let tx_id = tx_id_from_object(&base, "DataTransaction.delete_array")?;
1918 let array_name = parse_string(&array_name, "DataTransaction.delete_array arrayName")?;
1919 with_tx_mut(&tx_id, |tx| {
1920 if tx.status != TxnStatus::Open {
1921 return Err(data_error(
1922 "DataTransaction.delete_array: transaction is not open",
1923 ));
1924 }
1925 tx.delete_arrays.push(array_name);
1926 Ok(())
1927 })?;
1928 Ok(Value::Bool(true))
1929}
1930
1931#[runtime_builtin(
1932 name = "DataTransaction.create_array",
1933 category = "io/data",
1934 sink = true,
1935 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1936 descriptor(crate::builtins::io::data::DATATX_CREATE_ARRAY_DESCRIPTOR),
1937 builtin_path = "crate::builtins::io::data"
1938)]
1939async fn data_tx_create_array_builtin(
1940 base: Value,
1941 array_name: Value,
1942 meta: Value,
1943) -> BuiltinResult<Value> {
1944 let tx_id = tx_id_from_object(&base, "DataTransaction.create_array")?;
1945 let array_name = parse_string(&array_name, "DataTransaction.create_array arrayName")?;
1946 let meta = parse_array_meta(&array_name, &meta)?;
1947 with_tx_mut(&tx_id, |tx| {
1948 if tx.status != TxnStatus::Open {
1949 return Err(data_error(
1950 "DataTransaction.create_array: transaction is not open",
1951 ));
1952 }
1953 tx.create_arrays.push(PendingCreateArray {
1954 array: array_name,
1955 meta,
1956 });
1957 Ok(())
1958 })?;
1959 Ok(Value::Bool(true))
1960}
1961
1962#[runtime_builtin(
1963 name = "DataTransaction.commit",
1964 category = "io/data",
1965 sink = true,
1966 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1967 descriptor(crate::builtins::io::data::DATATX_COMMIT_DESCRIPTOR),
1968 builtin_path = "crate::builtins::io::data"
1969)]
1970async fn data_tx_commit_builtin(base: Value, rest: Vec<Value>) -> BuiltinResult<Value> {
1971 let tx_id = tx_id_from_object(&base, "DataTransaction.commit")?;
1972 let (dataset_path, base_sequence, writes, resizes, fills, create_arrays, delete_arrays, attrs) =
1973 with_tx(&tx_id, |tx| {
1974 if tx.status != TxnStatus::Open {
1975 return Err(data_error(
1976 "DataTransaction.commit: transaction is not open",
1977 ));
1978 }
1979 Ok((
1980 tx.dataset_path.clone(),
1981 tx.base_sequence,
1982 tx.writes.clone(),
1983 tx.resizes.clone(),
1984 tx.fills.clone(),
1985 tx.create_arrays.clone(),
1986 tx.delete_arrays.clone(),
1987 tx.attrs.clone(),
1988 ))
1989 })?;
1990
1991 let write_ops = writes.len();
1992 let resize_ops = resizes.len();
1993 let fill_ops = fills.len();
1994 let create_ops = create_arrays.len();
1995 let delete_ops = delete_arrays.len();
1996 let attr_updates = attrs.len();
1997
1998 let root = dataset_root(&dataset_path);
1999 let mut manifest = read_manifest_async(&root).await?;
2000 ensure_manifest_sequence(base_sequence, &manifest)?;
2001 if let Some(Value::Struct(options)) = rest.first() {
2002 if let Some(expected) = options.fields.get("if_manifest") {
2003 let expected = parse_string(expected, "DataTransaction.commit if_manifest")?;
2004 let actual = manifest_version_token(&manifest);
2005 if expected != actual {
2006 tracing::warn!(
2007 target: "runmat.data",
2008 tx_id = tx_id,
2009 expected_manifest = expected,
2010 actual_manifest = actual,
2011 "data transaction manifest conflict"
2012 );
2013 return Err(data_error(
2014 "MANIFEST_CONFLICT: if_manifest precondition failed",
2015 ));
2016 }
2017 }
2018 }
2019 for create in create_arrays {
2020 create_array_in_manifest(&root, &mut manifest, &create.array, create.meta).await?;
2021 }
2022 for resize in resizes {
2023 resize_array_in_manifest(&root, &mut manifest, &resize.array, resize.shape).await?;
2024 }
2025 for fill in fills {
2026 fill_array_in_manifest(
2027 &root,
2028 &mut manifest,
2029 &fill.array,
2030 fill.slice_spec.as_ref(),
2031 &fill.value,
2032 )
2033 .await?;
2034 }
2035 for write in writes {
2036 apply_write_to_manifest_async(
2037 &root,
2038 &mut manifest,
2039 &write.array,
2040 write.slice_spec.as_ref(),
2041 &write.value,
2042 )
2043 .await?;
2044 }
2045 for array_name in delete_arrays {
2046 delete_array_in_manifest_async(&root, &mut manifest, &array_name).await?;
2047 }
2048 for (k, v) in attrs {
2049 manifest.attrs.insert(k, value_to_json(&v));
2050 }
2051 manifest.updated_at = now_rfc3339();
2052 manifest.txn_sequence = manifest.txn_sequence.saturating_add(1);
2053 write_manifest_async(&root, &manifest).await?;
2054 with_tx_mut(&tx_id, |tx| {
2055 tx.status = TxnStatus::Committed;
2056 Ok(())
2057 })?;
2058 tracing::info!(
2059 target: "runmat.data",
2060 dataset = dataset_path,
2061 tx_id = tx_id,
2062 write_ops = write_ops,
2063 resize_ops = resize_ops,
2064 fill_ops = fill_ops,
2065 create_ops = create_ops,
2066 delete_ops = delete_ops,
2067 attr_updates = attr_updates,
2068 next_sequence = manifest.txn_sequence,
2069 "data transaction commit"
2070 );
2071 remove_tx(&tx_id);
2072 Ok(Value::Bool(true))
2073}
2074
2075#[runtime_builtin(
2076 name = "commit",
2077 category = "io/data",
2078 sink = true,
2079 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
2080 descriptor(crate::builtins::io::data::COMMIT_ALIAS_DESCRIPTOR),
2081 builtin_path = "crate::builtins::io::data"
2082)]
2083async fn data_tx_commit_alias_builtin(base: Value, rest: Vec<Value>) -> BuiltinResult<Value> {
2084 match &base {
2085 Value::Object(obj) if obj.class_name == "DataTransaction" => {
2086 data_tx_commit_builtin(base, rest).await
2087 }
2088 Value::HandleObject(handle) if handle.class_name == "DataTransaction" => {
2089 data_tx_commit_builtin(base, rest).await
2090 }
2091 _ => Err(data_error(
2092 "commit: receiver must be a DataTransaction (use tx = ds.begin())",
2093 )),
2094 }
2095}
2096
2097#[runtime_builtin(
2098 name = "DataTransaction.abort",
2099 category = "io/data",
2100 sink = true,
2101 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
2102 descriptor(crate::builtins::io::data::DATATX_ABORT_DESCRIPTOR),
2103 builtin_path = "crate::builtins::io::data"
2104)]
2105async fn data_tx_abort_builtin(base: Value) -> BuiltinResult<Value> {
2106 let tx_id = tx_id_from_object(&base, "DataTransaction.abort")?;
2107 with_tx_mut(&tx_id, |tx| {
2108 tx.status = TxnStatus::Aborted;
2109 Ok(())
2110 })?;
2111 tracing::info!(
2112 target: "runmat.data",
2113 tx_id = tx_id,
2114 "data transaction abort"
2115 );
2116 remove_tx(&tx_id);
2117 Ok(Value::Bool(true))
2118}
2119
2120#[runtime_builtin(
2121 name = "DataTransaction.status",
2122 category = "io/data",
2123 type_resolver(crate::builtins::io::type_resolvers::data_string_type),
2124 descriptor(crate::builtins::io::data::DATATX_STATUS_DESCRIPTOR),
2125 builtin_path = "crate::builtins::io::data"
2126)]
2127async fn data_tx_status_builtin(base: Value) -> BuiltinResult<Value> {
2128 let tx_id = tx_id_from_object(&base, "DataTransaction.status")?;
2129 with_tx(&tx_id, |tx| {
2130 let status = match tx.status {
2131 TxnStatus::Open => "open",
2132 TxnStatus::Committed => "committed",
2133 TxnStatus::Aborted => "aborted",
2134 };
2135 Ok(Value::String(status.to_string()))
2136 })
2137}
2138
2139fn dataset_path_from_object(base: &Value, context: &str) -> BuiltinResult<String> {
2140 let obj = as_object(base, context)?;
2141 parse_string(get_object_prop(obj, "__data_path")?, context)
2142}
2143
2144fn tx_id_from_object(base: &Value, context: &str) -> BuiltinResult<String> {
2145 let obj = as_object(base, context)?;
2146 parse_string(get_object_prop(obj, "__tx_id")?, context)
2147}
2148
2149fn array_identity(base: &Value, context: &str) -> BuiltinResult<(String, String)> {
2150 let obj = as_object(base, context)?;
2151 let path = parse_string(get_object_prop(obj, "__data_path")?, context)?;
2152 let name = parse_string(get_object_prop(obj, "__array_name")?, context)?;
2153 Ok((path, name))
2154}
2155
2156fn as_object<'a>(value: &'a Value, context: &str) -> BuiltinResult<&'a ObjectInstance> {
2157 match value {
2158 Value::Object(obj) => Ok(obj),
2159 _ => Err(data_error(format!("{context}: expected object receiver"))),
2160 }
2161}
2162
2163async fn hydrate_dataset_descriptor_async(path: &str, dataset: &mut Value) {
2164 let request = runmat_filesystem::data_contract::DataManifestRequest {
2165 path: path.to_string(),
2166 version: None,
2167 };
2168 let descriptor = match runmat_filesystem::data_manifest_descriptor_async(&request).await {
2169 Ok(descriptor) => descriptor,
2170 Err(_) => return,
2171 };
2172 let Value::Object(obj) = dataset else {
2173 return;
2174 };
2175 if !descriptor.dataset_id.is_empty() {
2176 obj.properties.insert(
2177 "__data_id".to_string(),
2178 Value::String(descriptor.dataset_id),
2179 );
2180 }
2181 obj.properties.insert(
2182 "__data_version".to_string(),
2183 Value::String(format!(
2184 "{}:{}",
2185 descriptor.updated_at, descriptor.txn_sequence
2186 )),
2187 );
2188}
2189
2190fn sanitize_label(label: &str) -> String {
2191 label
2192 .chars()
2193 .map(|ch| {
2194 if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
2195 ch
2196 } else {
2197 '_'
2198 }
2199 })
2200 .collect()
2201}
2202
2203async fn copy_file(src: &PathBuf, dst: &PathBuf) -> BuiltinResult<()> {
2204 let bytes = runmat_filesystem::read_async(src)
2205 .await
2206 .map_err(|err| data_error(format!("failed to open '{}': {err}", src.display())))?;
2207 let parent = dst.parent().ok_or_else(|| {
2208 data_error(format!(
2209 "invalid destination path '{}': missing parent",
2210 dst.display()
2211 ))
2212 })?;
2213 runmat_filesystem::create_dir_all_async(parent)
2214 .await
2215 .map_err(|err| data_error(format!("failed to create '{}': {err}", parent.display())))?;
2216 runmat_filesystem::write_async(dst, &bytes)
2217 .await
2218 .map_err(|err| {
2219 data_error(format!(
2220 "failed to copy '{}' -> '{}': {err}",
2221 src.display(),
2222 dst.display()
2223 ))
2224 })?;
2225 Ok(())
2226}
2227
2228fn make_rel_data_path(
2229 root: &std::path::Path,
2230 payload_path: &std::path::Path,
2231) -> BuiltinResult<String> {
2232 let rel = payload_path
2233 .strip_prefix(root)
2234 .map_err(|err| data_error(format!("failed to compute relative data path: {err}")))?;
2235 Ok(rel.to_string_lossy().to_string())
2236}
2237
2238async fn create_array_in_manifest(
2239 root: &std::path::Path,
2240 manifest: &mut DataManifest,
2241 array_name: &str,
2242 mut meta: DataArrayMeta,
2243) -> BuiltinResult<()> {
2244 if manifest.arrays.contains_key(array_name) {
2245 return Err(data_error(format!(
2246 "DataTransaction.create_array: array '{array_name}' already exists"
2247 )));
2248 }
2249 let payload = DataArrayPayload {
2250 dtype: meta.dtype.clone(),
2251 shape: meta.shape.clone(),
2252 values: vec![0.0; meta.shape.iter().copied().product()],
2253 };
2254 let (payload_path, chunk_index_path) =
2255 write_array_payload_async(root, array_name, &payload, &meta.chunk_shape).await?;
2256 meta.data_path = make_rel_data_path(root, &payload_path)?;
2257 meta.chunk_index_path = Some(make_rel_data_path(root, &chunk_index_path)?);
2258 manifest.arrays.insert(array_name.to_string(), meta);
2259 Ok(())
2260}
2261
2262async fn resize_array_in_manifest(
2263 root: &std::path::Path,
2264 manifest: &mut DataManifest,
2265 array_name: &str,
2266 shape: Vec<usize>,
2267) -> BuiltinResult<()> {
2268 let meta = manifest
2269 .arrays
2270 .get_mut(array_name)
2271 .ok_or_else(|| data_error(format!("array '{array_name}' not found")))?;
2272 meta.shape = shape.clone();
2273 let payload = DataArrayPayload {
2274 dtype: meta.dtype.clone(),
2275 shape: shape.clone(),
2276 values: vec![0.0; shape.iter().copied().product()],
2277 };
2278 let (payload_path, chunk_index_path) =
2279 write_array_payload_async(root, array_name, &payload, &meta.chunk_shape).await?;
2280 meta.data_path = make_rel_data_path(root, &payload_path)?;
2281 meta.chunk_index_path = Some(make_rel_data_path(root, &chunk_index_path)?);
2282 Ok(())
2283}
2284
2285async fn fill_array_in_manifest(
2286 root: &std::path::Path,
2287 manifest: &mut DataManifest,
2288 array_name: &str,
2289 slice_spec: Option<&Value>,
2290 value: &Value,
2291) -> BuiltinResult<()> {
2292 let meta: DataArrayMeta = manifest
2293 .arrays
2294 .get(array_name)
2295 .cloned()
2296 .ok_or_else(|| data_error(format!("array '{array_name}' not found")))?;
2297 let scalar = scalar_to_f64(value)?;
2298 let payload = read_array_payload_async(root, &meta).await?;
2299 let next_payload = if let Some(slice_spec) = slice_spec {
2300 let ranges = parse_slice_spec(slice_spec, &payload.shape)?;
2301 let target_shape: Vec<usize> = ranges
2302 .iter()
2303 .map(|r| r.end.saturating_sub(r.start))
2304 .collect();
2305 let rhs = Value::Tensor(
2306 Tensor::new(
2307 vec![scalar; target_shape.iter().copied().product()],
2308 target_shape,
2309 )
2310 .map_err(|err| data_error(format!("DataTransaction.fill: {err}")))?,
2311 );
2312 write_slice_payload(&payload, slice_spec, &rhs)?
2313 } else {
2314 DataArrayPayload {
2315 dtype: payload.dtype,
2316 shape: payload.shape.clone(),
2317 values: vec![scalar; payload.shape.iter().copied().product()],
2318 }
2319 };
2320 let (payload_path, chunk_index_path) =
2321 write_array_payload_async(root, array_name, &next_payload, &meta.chunk_shape).await?;
2322 if let Some(updated) = manifest.arrays.get_mut(array_name) {
2323 updated.shape = next_payload.shape.clone();
2324 updated.data_path = make_rel_data_path(root, &payload_path)?;
2325 updated.chunk_index_path = Some(make_rel_data_path(root, &chunk_index_path)?);
2326 }
2327 Ok(())
2328}
2329
2330async fn delete_array_in_manifest_async(
2331 root: &std::path::Path,
2332 manifest: &mut DataManifest,
2333 array_name: &str,
2334) -> BuiltinResult<()> {
2335 let removed = manifest.arrays.remove(array_name);
2336 if removed.is_none() {
2337 return Err(data_error(format!(
2338 "DataTransaction.delete_array: array '{array_name}' not found"
2339 )));
2340 }
2341 let array_dir = root.join("arrays").join(array_name);
2342 if runmat_filesystem::metadata_async(&array_dir).await.is_ok() {
2343 runmat_filesystem::remove_dir_all_async(&array_dir)
2344 .await
2345 .map_err(|err| {
2346 data_error(format!(
2347 "DataTransaction.delete_array: failed to remove '{}': {err}",
2348 array_dir.display()
2349 ))
2350 })?;
2351 }
2352 Ok(())
2353}
2354
2355fn parse_array_meta(array_name: &str, meta: &Value) -> BuiltinResult<DataArrayMeta> {
2356 let Value::Struct(meta_struct) = meta else {
2357 return Err(data_error(
2358 "DataTransaction.create_array: meta must be a struct",
2359 ));
2360 };
2361 let dtype = meta_struct
2362 .fields
2363 .get("dtype")
2364 .map(|v| parse_string(v, "DataTransaction.create_array dtype"))
2365 .transpose()?
2366 .unwrap_or_else(|| "f64".to_string());
2367 let shape = meta_struct
2368 .fields
2369 .get("shape")
2370 .map(parse_shape_from_value)
2371 .transpose()?
2372 .unwrap_or_else(|| vec![0, 0]);
2373 let chunk_shape = meta_struct
2374 .fields
2375 .get("chunk")
2376 .map(parse_shape_from_value)
2377 .transpose()?
2378 .unwrap_or_else(|| default_chunk_shape(&shape));
2379 let codec = meta_struct
2380 .fields
2381 .get("codec")
2382 .map(|v| parse_string(v, "DataTransaction.create_array codec"))
2383 .transpose()?
2384 .unwrap_or_else(|| "zstd".to_string());
2385 Ok(DataArrayMeta {
2386 dtype,
2387 shape,
2388 chunk_shape,
2389 order: "column_major".to_string(),
2390 codec,
2391 chunk_index_path: Some(format!("arrays/{array_name}/chunks/index.json")),
2392 data_path: format!("arrays/{array_name}/data.f64.json"),
2393 })
2394}
2395
2396fn default_chunk_shape(shape: &[usize]) -> Vec<usize> {
2397 if shape.is_empty() {
2398 return vec![1024];
2399 }
2400 let mut out = shape.to_vec();
2401 if out.len() == 1 {
2402 out[0] = out[0].clamp(1, 65_536);
2403 return out;
2404 }
2405 out[0] = out[0].clamp(1, 256);
2406 out[1] = out[1].clamp(1, 256);
2407 for dim in out.iter_mut().skip(2) {
2408 *dim = (*dim).clamp(1, 8);
2409 }
2410 out
2411}
2412
2413#[async_recursion::async_recursion(?Send)]
2414async fn copy_dir_recursive(src: &PathBuf, dst: &PathBuf) -> BuiltinResult<()> {
2415 let metadata = runmat_filesystem::metadata_async(src)
2416 .await
2417 .map_err(|err| data_error(format!("failed to stat '{}': {err}", src.display())))?;
2418 if !metadata.is_dir() {
2419 return Err(data_error(format!(
2420 "expected dataset directory at '{}'",
2421 src.display()
2422 )));
2423 }
2424 runmat_filesystem::create_dir_all_async(dst)
2425 .await
2426 .map_err(|err| data_error(format!("failed to create '{}': {err}", dst.display())))?;
2427 for entry in runmat_filesystem::read_dir_async(src)
2428 .await
2429 .map_err(|err| data_error(format!("failed to read '{}': {err}", src.display())))?
2430 {
2431 let entry_src = entry.path().to_path_buf();
2432 let entry_dst = dst.join(entry.file_name());
2433 if entry.is_dir() {
2434 copy_dir_recursive(&entry_src, &entry_dst).await?;
2435 continue;
2436 }
2437 copy_file(&entry_src, &entry_dst).await?;
2438 }
2439 Ok(())
2440}
2441
2442fn parse_shape_from_value(value: &Value) -> BuiltinResult<Vec<usize>> {
2443 match value {
2444 Value::Tensor(t) => {
2445 let mut out = Vec::with_capacity(t.data.len());
2446 for v in &t.data {
2447 if !v.is_finite() || *v < 0.0 {
2448 return Err(data_error(
2449 "shape dimensions must be non-negative finite numbers",
2450 ));
2451 }
2452 out.push(*v as usize);
2453 }
2454 Ok(out)
2455 }
2456 Value::Num(v) => {
2457 if !v.is_finite() || *v < 0.0 {
2458 return Err(data_error(
2459 "shape dimensions must be non-negative finite numbers",
2460 ));
2461 }
2462 Ok(vec![*v as usize])
2463 }
2464 Value::Int(v) => {
2465 let n = v.to_i64();
2466 if n < 0 {
2467 return Err(data_error("shape dimensions must be non-negative"));
2468 }
2469 Ok(vec![n as usize])
2470 }
2471 _ => Err(data_error("shape must be a numeric vector")),
2472 }
2473}
2474
2475fn scalar_to_f64(value: &Value) -> BuiltinResult<f64> {
2476 match value {
2477 Value::Num(v) => Ok(*v),
2478 Value::Int(v) => Ok(v.to_i64() as f64),
2479 _ => Err(data_error("expected numeric scalar")),
2480 }
2481}
2482
2483async fn write_array_full_async(
2484 dataset_path: &str,
2485 array_name: &str,
2486 slice_spec: Option<&Value>,
2487 value: &Value,
2488) -> BuiltinResult<()> {
2489 let root = dataset_root(dataset_path);
2490 let mut manifest = read_manifest_async(&root).await?;
2491 apply_write_to_manifest_async(&root, &mut manifest, array_name, slice_spec, value).await?;
2492 manifest.updated_at = now_rfc3339();
2493 manifest.txn_sequence = manifest.txn_sequence.saturating_add(1);
2494 write_manifest_async(&root, &manifest).await
2495}
2496
2497async fn apply_write_to_manifest_async(
2498 root: &std::path::Path,
2499 manifest: &mut DataManifest,
2500 array_name: &str,
2501 slice_spec: Option<&Value>,
2502 value: &Value,
2503) -> BuiltinResult<()> {
2504 let meta: DataArrayMeta = manifest
2505 .arrays
2506 .get(array_name)
2507 .cloned()
2508 .ok_or_else(|| data_error(format!("array '{array_name}' not found")))?;
2509
2510 if let Some(slice_spec) = slice_spec {
2511 if apply_slice_write_chunked_async(root, manifest, array_name, &meta, slice_spec, value)
2512 .await?
2513 {
2514 return Ok(());
2515 }
2516 }
2517
2518 let payload = read_array_payload_async(root, &meta).await?;
2519 let mut next_payload = payload.clone();
2520 if let Some(slice_spec) = slice_spec {
2521 next_payload = write_slice_payload(&payload, slice_spec, value)?;
2522 } else {
2523 let (shape, values) = value_to_tensor_shape_values(value)?;
2524 next_payload.shape = shape;
2525 next_payload.values = values;
2526 }
2527
2528 let (payload_path, chunk_index_path) =
2529 write_array_payload_async(root, array_name, &next_payload, &meta.chunk_shape).await?;
2530 if let Some(updated) = manifest.arrays.get_mut(array_name) {
2531 updated.shape = next_payload.shape.clone();
2532 updated.data_path = make_rel_data_path(root, &payload_path)?;
2533 updated.chunk_index_path = Some(make_rel_data_path(root, &chunk_index_path)?);
2534 }
2535 Ok(())
2536}
2537
2538async fn apply_slice_write_chunked_async(
2539 root: &std::path::Path,
2540 manifest: &mut DataManifest,
2541 array_name: &str,
2542 meta: &DataArrayMeta,
2543 slice_spec: &Value,
2544 value: &Value,
2545) -> BuiltinResult<bool> {
2546 let Some(index_rel_path) = &meta.chunk_index_path else {
2547 return Ok(false);
2548 };
2549 let index_path = root.join(index_rel_path);
2550 if runmat_filesystem::metadata_async(&index_path)
2551 .await
2552 .is_err()
2553 {
2554 return Ok(false);
2555 }
2556 let ranges = parse_slice_spec(slice_spec, &meta.shape)?;
2557 let rhs_shape: Vec<usize> = ranges
2558 .iter()
2559 .map(|r| r.end.saturating_sub(r.start))
2560 .collect();
2561 let (actual_rhs_shape, rhs_values) = value_to_tensor_shape_values(value)?;
2562 if actual_rhs_shape != rhs_shape {
2563 return Err(data_error(format!(
2564 "SHAPE_MISMATCH: rhs shape {:?} must match target slice shape {:?}",
2565 actual_rhs_shape, rhs_shape
2566 )));
2567 }
2568
2569 let index_bytes = runmat_filesystem::read_async(&index_path)
2570 .await
2571 .map_err(|err| {
2572 data_error(format!(
2573 "failed to read chunk index '{}': {err}",
2574 index_path.display()
2575 ))
2576 })?;
2577 let mut chunk_index: DataChunkIndex = serde_json::from_slice(&index_bytes).map_err(|err| {
2578 data_error(format!(
2579 "failed to parse chunk index '{}': {err}",
2580 index_path.display()
2581 ))
2582 })?;
2583
2584 let mut pos_by_key = HashMap::new();
2585 for (idx, entry) in chunk_index.chunks.iter().enumerate() {
2586 pos_by_key.insert(entry.key.clone(), idx);
2587 }
2588 let touched = touched_chunk_coords(&ranges, &meta.chunk_shape, &meta.shape);
2589 let mut upload_batch = Vec::<(DataChunkDescriptor, Vec<u8>)>::new();
2590
2591 for coords in touched {
2592 let key = chunk_key(&coords);
2593 let chunk_start = chunk_start_for_coords(&coords, &meta.chunk_shape);
2594 let chunk_extent = chunk_extent_for_start(&chunk_start, &meta.chunk_shape, &meta.shape);
2595 let intersection = chunk_intersection(&ranges, &chunk_start, &chunk_extent);
2596 if intersection.is_empty() {
2597 continue;
2598 }
2599
2600 let (entry_index, existed, mut entry, mut chunk_payload) = load_or_init_chunk(
2601 root,
2602 array_name,
2603 &key,
2604 &coords,
2605 &chunk_extent,
2606 &pos_by_key,
2607 &chunk_index,
2608 )
2609 .await?;
2610
2611 let mut local = vec![0usize; intersection.len()];
2612 let intersection_shape: Vec<usize> = intersection
2613 .iter()
2614 .map(|r| r.end.saturating_sub(r.start))
2615 .collect();
2616 loop {
2617 let mut global = Vec::with_capacity(intersection.len());
2618 for dim in 0..intersection.len() {
2619 global.push(intersection[dim].start + local[dim]);
2620 }
2621 let rhs_index: Vec<usize> = global
2622 .iter()
2623 .enumerate()
2624 .map(|(dim, g)| g.saturating_sub(ranges[dim].start))
2625 .collect();
2626 let chunk_index_local: Vec<usize> = global
2627 .iter()
2628 .enumerate()
2629 .map(|(dim, g)| g.saturating_sub(chunk_start[dim]))
2630 .collect();
2631 let rhs_linear = linear_index_column_major(&rhs_index, &rhs_shape)?;
2632 let chunk_linear = linear_index_column_major(&chunk_index_local, &chunk_extent)?;
2633 chunk_payload.values[chunk_linear] = rhs_values[rhs_linear];
2634 if !advance_index(&mut local, &intersection_shape) {
2635 break;
2636 }
2637 }
2638
2639 let chunk_bytes = serde_json::to_vec(&chunk_payload)
2640 .map_err(|err| data_error(format!("failed to encode chunk payload: {err}")))?;
2641 let chunk_path = root.join(&entry.data_path);
2642 runmat_filesystem::write_async(&chunk_path, &chunk_bytes)
2643 .await
2644 .map_err(|err| {
2645 data_error(format!(
2646 "failed to write chunk payload '{}': {err}",
2647 chunk_path.display()
2648 ))
2649 })?;
2650
2651 entry.coords = coords.clone();
2652 entry.shape = chunk_extent.clone();
2653 entry.bytes_raw = chunk_bytes.len() as u64;
2654 entry.bytes_stored = chunk_bytes.len() as u64;
2655 entry.hash = sha256_hex(&chunk_bytes);
2656 if existed {
2657 chunk_index.chunks[entry_index] = entry.clone();
2658 } else {
2659 chunk_index.chunks.push(entry.clone());
2660 pos_by_key.insert(key.clone(), chunk_index.chunks.len() - 1);
2661 }
2662 upload_batch.push((
2663 DataChunkDescriptor {
2664 key: key.clone(),
2665 object_id: entry.object_id.clone(),
2666 hash: entry.hash.clone(),
2667 bytes_raw: entry.bytes_raw,
2668 bytes_stored: entry.bytes_stored,
2669 },
2670 chunk_bytes,
2671 ));
2672 }
2673
2674 maybe_upload_chunk_batch_async(root, array_name, upload_batch).await?;
2675 tracing::info!(
2676 target: "runmat.data",
2677 dataset = %root.display(),
2678 array = array_name,
2679 touched_chunks = chunk_index.chunks.len(),
2680 "chunked slice write committed"
2681 );
2682 let index_write = serde_json::to_vec(&chunk_index)
2683 .map_err(|err| data_error(format!("failed to encode chunk index json: {err}")))?;
2684 runmat_filesystem::write_async(&index_path, &index_write)
2685 .await
2686 .map_err(|err| {
2687 data_error(format!(
2688 "failed to write chunk index '{}': {err}",
2689 index_path.display()
2690 ))
2691 })?;
2692
2693 if let Some(updated) = manifest.arrays.get_mut(array_name) {
2694 updated.shape = meta.shape.clone();
2695 updated.chunk_index_path = Some(index_rel_path.clone());
2696 }
2697 Ok(true)
2698}
2699
2700fn value_to_tensor_shape_values(value: &Value) -> BuiltinResult<(Vec<usize>, Vec<f64>)> {
2701 match value {
2702 Value::Tensor(t) => Ok((t.shape.clone(), t.data.clone())),
2703 Value::Num(n) => Ok((vec![1, 1], vec![*n])),
2704 Value::Int(i) => Ok((vec![1, 1], vec![i.to_i64() as f64])),
2705 _ => Err(data_error(
2706 "DataArray.write supports tensor or numeric scalar values",
2707 )),
2708 }
2709}
2710
2711#[derive(Clone, Copy, Debug)]
2712struct DimRange {
2713 start: usize,
2714 end: usize,
2715}
2716
2717fn read_slice_payload(
2718 payload: &DataArrayPayload,
2719 slice_spec: &Value,
2720) -> BuiltinResult<DataArrayPayload> {
2721 let ranges = parse_slice_spec(slice_spec, &payload.shape)?;
2722 let out_shape: Vec<usize> = ranges
2723 .iter()
2724 .map(|r| r.end.saturating_sub(r.start))
2725 .collect();
2726 let mut out_values = Vec::new();
2727 let mut out_index = vec![0usize; out_shape.len()];
2728 loop {
2729 let source_index: Vec<usize> = out_index
2730 .iter()
2731 .enumerate()
2732 .map(|(dim, idx)| ranges[dim].start + *idx)
2733 .collect();
2734 let linear = linear_index_column_major(&source_index, &payload.shape)?;
2735 out_values.push(payload.values[linear]);
2736
2737 if !advance_index(&mut out_index, &out_shape) {
2738 break;
2739 }
2740 }
2741 Ok(DataArrayPayload {
2742 dtype: payload.dtype.clone(),
2743 shape: out_shape,
2744 values: out_values,
2745 })
2746}
2747
2748fn write_slice_payload(
2749 payload: &DataArrayPayload,
2750 slice_spec: &Value,
2751 rhs: &Value,
2752) -> BuiltinResult<DataArrayPayload> {
2753 let ranges = parse_slice_spec(slice_spec, &payload.shape)?;
2754 let target_shape: Vec<usize> = ranges
2755 .iter()
2756 .map(|r| r.end.saturating_sub(r.start))
2757 .collect();
2758 let (rhs_shape, rhs_values) = value_to_tensor_shape_values(rhs)?;
2759 if rhs_shape != target_shape {
2760 return Err(data_error(format!(
2761 "SHAPE_MISMATCH: rhs shape {:?} must match target slice shape {:?}",
2762 rhs_shape, target_shape
2763 )));
2764 }
2765
2766 let mut next = payload.values.clone();
2767 let mut rhs_index = vec![0usize; target_shape.len()];
2768 let mut rhs_linear = 0usize;
2769 loop {
2770 let target_index: Vec<usize> = rhs_index
2771 .iter()
2772 .enumerate()
2773 .map(|(dim, idx)| ranges[dim].start + *idx)
2774 .collect();
2775 let target_linear = linear_index_column_major(&target_index, &payload.shape)?;
2776 next[target_linear] = rhs_values[rhs_linear];
2777 rhs_linear += 1;
2778
2779 if !advance_index(&mut rhs_index, &target_shape) {
2780 break;
2781 }
2782 }
2783
2784 Ok(DataArrayPayload {
2785 dtype: payload.dtype.clone(),
2786 shape: payload.shape.clone(),
2787 values: next,
2788 })
2789}
2790
2791fn parse_slice_spec(slice_spec: &Value, shape: &[usize]) -> BuiltinResult<Vec<DimRange>> {
2792 match slice_spec {
2793 Value::Cell(cell) => {
2794 if cell.data.is_empty() {
2795 return Err(data_error("INVALID_SLICE: empty slice specification"));
2796 }
2797 let mut ranges = Vec::with_capacity(shape.len());
2798 for (dim, extent) in shape.iter().enumerate() {
2799 if let Some(item) = cell.data.get(dim).map(|v| &**v) {
2800 ranges.push(parse_dim_range(item, *extent)?);
2801 } else {
2802 ranges.push(DimRange {
2803 start: 0,
2804 end: *extent,
2805 });
2806 }
2807 }
2808 Ok(ranges)
2809 }
2810 Value::String(s) if s == ":" => Ok(shape
2811 .iter()
2812 .map(|extent| DimRange {
2813 start: 0,
2814 end: *extent,
2815 })
2816 .collect()),
2817 _ => Err(data_error(
2818 "INVALID_SLICE: slice must be a cell spec like {1:10, :} or ':'",
2819 )),
2820 }
2821}
2822
2823fn parse_dim_range(value: &Value, extent: usize) -> BuiltinResult<DimRange> {
2824 if extent == 0 {
2825 return Ok(DimRange { start: 0, end: 0 });
2826 }
2827 match value {
2828 Value::String(s) if s == ":" => Ok(DimRange {
2829 start: 0,
2830 end: extent,
2831 }),
2832 Value::Num(n) => {
2833 let idx = (*n as isize) - 1;
2834 if idx < 0 || idx as usize >= extent {
2835 return Err(data_error("INVALID_SLICE: index out of bounds"));
2836 }
2837 Ok(DimRange {
2838 start: idx as usize,
2839 end: idx as usize + 1,
2840 })
2841 }
2842 Value::Int(i) => {
2843 let idx = i.to_i64() - 1;
2844 if idx < 0 || idx as usize >= extent {
2845 return Err(data_error("INVALID_SLICE: index out of bounds"));
2846 }
2847 Ok(DimRange {
2848 start: idx as usize,
2849 end: idx as usize + 1,
2850 })
2851 }
2852 Value::Tensor(t) if t.data.len() == 2 => {
2853 let start = (t.data[0] as isize) - 1;
2854 let end_inclusive = (t.data[1] as isize) - 1;
2855 if start < 0 || end_inclusive < start || end_inclusive as usize >= extent {
2856 return Err(data_error("INVALID_SLICE: range out of bounds"));
2857 }
2858 Ok(DimRange {
2859 start: start as usize,
2860 end: end_inclusive as usize + 1,
2861 })
2862 }
2863 _ => Err(data_error(
2864 "INVALID_SLICE: dimension must be ':', scalar index, or [start end] range",
2865 )),
2866 }
2867}
2868
2869fn linear_index_column_major(index: &[usize], shape: &[usize]) -> BuiltinResult<usize> {
2870 if index.len() != shape.len() {
2871 return Err(data_error("INVALID_SLICE: rank mismatch"));
2872 }
2873 let mut stride = 1usize;
2874 let mut linear = 0usize;
2875 for (idx, extent) in index.iter().zip(shape.iter()) {
2876 if *idx >= *extent {
2877 return Err(data_error("INVALID_SLICE: index out of bounds"));
2878 }
2879 linear += idx * stride;
2880 stride = stride.saturating_mul(*extent);
2881 }
2882 Ok(linear)
2883}
2884
2885fn advance_index(index: &mut [usize], shape: &[usize]) -> bool {
2886 if shape.is_empty() {
2887 return false;
2888 }
2889 for dim in 0..shape.len() {
2890 index[dim] += 1;
2891 if index[dim] < shape[dim] {
2892 return true;
2893 }
2894 index[dim] = 0;
2895 }
2896 false
2897}
2898
2899fn chunk_key(coords: &[usize]) -> String {
2900 coords
2901 .iter()
2902 .map(|v| v.to_string())
2903 .collect::<Vec<_>>()
2904 .join(".")
2905}
2906
2907fn chunk_start_for_coords(coords: &[usize], chunk_shape: &[usize]) -> Vec<usize> {
2908 coords
2909 .iter()
2910 .enumerate()
2911 .map(|(dim, coord)| coord * chunk_shape.get(dim).copied().unwrap_or(1).max(1))
2912 .collect()
2913}
2914
2915fn chunk_extent_for_start(start: &[usize], chunk_shape: &[usize], shape: &[usize]) -> Vec<usize> {
2916 start
2917 .iter()
2918 .enumerate()
2919 .map(|(dim, start)| {
2920 let chunk = chunk_shape.get(dim).copied().unwrap_or(1).max(1);
2921 let end = (*start + chunk).min(shape[dim]);
2922 end.saturating_sub(*start)
2923 })
2924 .collect()
2925}
2926
2927fn chunk_intersection(
2928 ranges: &[DimRange],
2929 chunk_start: &[usize],
2930 chunk_extent: &[usize],
2931) -> Vec<DimRange> {
2932 let mut out = Vec::with_capacity(ranges.len());
2933 for dim in 0..ranges.len() {
2934 let c_start = chunk_start[dim];
2935 let c_end = c_start + chunk_extent[dim];
2936 let start = ranges[dim].start.max(c_start);
2937 let end = ranges[dim].end.min(c_end);
2938 if start >= end {
2939 return Vec::new();
2940 }
2941 out.push(DimRange { start, end });
2942 }
2943 out
2944}
2945
2946fn touched_chunk_coords(
2947 ranges: &[DimRange],
2948 chunk_shape: &[usize],
2949 shape: &[usize],
2950) -> Vec<Vec<usize>> {
2951 let mut span = Vec::with_capacity(ranges.len());
2952 let mut begin = Vec::with_capacity(ranges.len());
2953 for dim in 0..ranges.len() {
2954 if shape[dim] == 0 {
2955 return Vec::new();
2956 }
2957 let chunk = chunk_shape.get(dim).copied().unwrap_or(1).max(1);
2958 let first = ranges[dim].start / chunk;
2959 let last = (ranges[dim].end.saturating_sub(1)) / chunk;
2960 begin.push(first);
2961 span.push(last.saturating_sub(first) + 1);
2962 }
2963 let mut local = vec![0usize; span.len()];
2964 let mut out = Vec::new();
2965 loop {
2966 out.push(
2967 local
2968 .iter()
2969 .enumerate()
2970 .map(|(dim, v)| begin[dim] + *v)
2971 .collect::<Vec<_>>(),
2972 );
2973 if !advance_index(&mut local, &span) {
2974 break;
2975 }
2976 }
2977 out
2978}
2979
2980async fn maybe_upload_chunk_batch_async(
2981 root: &std::path::Path,
2982 array_name: &str,
2983 batch: Vec<(DataChunkDescriptor, Vec<u8>)>,
2984) -> BuiltinResult<()> {
2985 if batch.is_empty() {
2986 return Ok(());
2987 }
2988 let request = DataChunkUploadRequest {
2989 dataset_path: root.to_string_lossy().to_string(),
2990 array: array_name.to_string(),
2991 chunks: batch.iter().map(|(d, _)| d.clone()).collect(),
2992 };
2993 let targets = match runmat_filesystem::data_chunk_upload_targets_async(&request).await {
2994 Ok(targets) => targets,
2995 Err(err) if err.kind() == std::io::ErrorKind::Unsupported => return Ok(()),
2996 Err(err) => {
2997 return Err(data_error(format!(
2998 "failed to request data chunk upload targets: {err}"
2999 )))
3000 }
3001 };
3002 for (descriptor, bytes) in batch {
3003 let target = targets
3004 .iter()
3005 .find(|t| t.key == descriptor.key)
3006 .ok_or_else(|| {
3007 data_error(format!(
3008 "missing upload target for chunk '{}'",
3009 descriptor.key
3010 ))
3011 })?;
3012 runmat_filesystem::data_upload_chunk_async(target, &bytes)
3013 .await
3014 .map_err(|err| {
3015 data_error(format!(
3016 "failed to upload chunk '{}': {err}",
3017 descriptor.key
3018 ))
3019 })?;
3020 tracing::info!(
3021 target: "runmat.data",
3022 dataset = %root.display(),
3023 array = array_name,
3024 chunk_key = descriptor.key,
3025 bytes = bytes.len(),
3026 "chunk upload completed"
3027 );
3028 }
3029 Ok(())
3030}
3031
3032fn chunk_rel_path(array_name: &str, object_id: &str) -> String {
3033 format!("arrays/{array_name}/chunks/{object_id}.json")
3034}
3035
3036async fn load_or_init_chunk(
3037 root: &std::path::Path,
3038 array_name: &str,
3039 key: &str,
3040 coords: &[usize],
3041 chunk_extent: &[usize],
3042 pos_by_key: &HashMap<String, usize>,
3043 chunk_index: &DataChunkIndex,
3044) -> BuiltinResult<(usize, bool, DataChunkIndexEntry, DataArrayPayload)> {
3045 if let Some(index) = pos_by_key.get(key).copied() {
3046 let entry = chunk_index
3047 .chunks
3048 .get(index)
3049 .cloned()
3050 .ok_or_else(|| data_error(format!("chunk index missing key '{key}'")))?;
3051 let bytes = runmat_filesystem::read_async(root.join(&entry.data_path))
3052 .await
3053 .map_err(|err| {
3054 data_error(format!(
3055 "failed to read chunk payload '{}': {err}",
3056 entry.data_path
3057 ))
3058 })?;
3059 let payload: DataArrayPayload = serde_json::from_slice(&bytes).map_err(|err| {
3060 data_error(format!(
3061 "failed to parse chunk payload '{}': {err}",
3062 entry.data_path
3063 ))
3064 })?;
3065 return Ok((index, true, entry, payload));
3066 }
3067
3068 let object_id = format!("obj_{}", key.replace('.', "_"));
3069 let entry = DataChunkIndexEntry {
3070 key: key.to_string(),
3071 object_id: object_id.clone(),
3072 hash: String::new(),
3073 bytes_raw: 0,
3074 bytes_stored: 0,
3075 coords: coords.to_vec(),
3076 shape: chunk_extent.to_vec(),
3077 data_path: chunk_rel_path(array_name, &object_id),
3078 };
3079 let payload = DataArrayPayload {
3080 dtype: "f64".to_string(),
3081 shape: chunk_extent.to_vec(),
3082 values: vec![0.0; chunk_extent.iter().copied().product()],
3083 };
3084 Ok((chunk_index.chunks.len(), false, entry, payload))
3085}
3086
3087fn attrs_to_struct(attrs: &BTreeMap<String, serde_json::Value>) -> Value {
3088 let mut out = StructValue::new();
3089 for (k, v) in attrs {
3090 out.fields.insert(k.clone(), json_to_value(v));
3091 }
3092 Value::Struct(out)
3093}
3094
3095fn value_to_json(value: &Value) -> serde_json::Value {
3096 match value {
3097 Value::String(s) => serde_json::Value::String(s.clone()),
3098 Value::CharArray(ca) => serde_json::Value::String(ca.to_string()),
3099 Value::Num(n) => serde_json::json!(n),
3100 Value::Int(i) => serde_json::json!(i.to_i64()),
3101 Value::Bool(b) => serde_json::json!(b),
3102 _ => serde_json::Value::String(format!("{value:?}")),
3103 }
3104}
3105
3106fn json_to_value(value: &serde_json::Value) -> Value {
3107 match value {
3108 serde_json::Value::Bool(b) => Value::Bool(*b),
3109 serde_json::Value::Number(n) => Value::Num(n.as_f64().unwrap_or_default()),
3110 serde_json::Value::String(s) => Value::String(s.clone()),
3111 serde_json::Value::Array(arr) => {
3112 let vals = arr.iter().map(json_to_value).collect::<Vec<_>>();
3113 crate::make_cell(vals.clone(), 1, vals.len())
3114 .unwrap_or_else(|_| Value::String("<invalid-array>".to_string()))
3115 }
3116 serde_json::Value::Object(map) => {
3117 let mut s = StructValue::new();
3118 for (k, v) in map {
3119 s.fields.insert(k.clone(), json_to_value(v));
3120 }
3121 Value::Struct(s)
3122 }
3123 serde_json::Value::Null => Value::String("".to_string()),
3124 }
3125}
3126
3127#[cfg(all(test, not(target_arch = "wasm32")))]
3128mod tests {
3129 use super::*;
3130 use crate::dispatcher::call_builtin;
3131 use async_trait::async_trait;
3132 use axum::extract::{Query, State};
3133 use axum::http::{HeaderMap, StatusCode};
3134 use axum::routing::{post, put};
3135 use axum::{Json, Router};
3136 use runmat_builtins::CellArray;
3137 use runmat_filesystem::data_contract::{
3138 DataChunkUploadRequest, DataChunkUploadTarget, DataManifestDescriptor, DataManifestRequest,
3139 };
3140 use runmat_filesystem::{
3141 DirEntry, FileHandle, FsMetadata, FsProvider, NativeFsProvider, OpenFlags,
3142 };
3143 use serde::Deserialize;
3144 use std::path::Path;
3145 use std::sync::{Arc, Mutex, MutexGuard};
3146 use tokio::runtime::Runtime;
3147 use tokio::sync::oneshot;
3148
3149 fn serial_test_guard() -> MutexGuard<'static, ()> {
3150 runmat_filesystem::provider_override_lock()
3151 }
3152
3153 fn native_provider_guard() -> runmat_filesystem::ProviderGuard {
3154 runmat_filesystem::replace_provider(Arc::new(NativeFsProvider))
3155 }
3156
3157 #[test]
3158 fn io_data_descriptors_cover_constructor_and_transaction_surface() {
3159 let data_labels: Vec<&str> = DATA_CREATE_DESCRIPTOR
3160 .signatures
3161 .iter()
3162 .map(|sig| sig.label)
3163 .collect();
3164 assert!(data_labels.contains(&"ds = data.create(path, schema, Name, Value, ...)"));
3165
3166 let read_labels: Vec<&str> = DATAARRAY_READ_DESCRIPTOR
3167 .signatures
3168 .iter()
3169 .map(|sig| sig.label)
3170 .collect();
3171 assert!(read_labels.contains(&"X = DataArray.read(arr, sliceSpec)"));
3172
3173 let write_labels: Vec<&str> = DATAARRAY_WRITE_DESCRIPTOR
3174 .signatures
3175 .iter()
3176 .map(|sig| sig.label)
3177 .collect();
3178 assert!(write_labels.contains(&"tf = DataArray.write(arr, values)"));
3179 assert!(write_labels.contains(&"tf = DataArray.write(arr, sliceSpec, values)"));
3180
3181 let tx_labels: Vec<&str> = DATATX_COMMIT_DESCRIPTOR
3182 .signatures
3183 .iter()
3184 .map(|sig| sig.label)
3185 .collect();
3186 assert!(tx_labels.contains(&"tf = DataTransaction.commit(tx, Name, Value, ...)"));
3187 }
3188
3189 #[derive(Default)]
3190 struct CountingDataUploadProvider {
3191 inner: NativeFsProvider,
3192 uploaded_keys: Arc<Mutex<Vec<String>>>,
3193 }
3194
3195 struct HttpDataUploadProvider {
3196 inner: NativeFsProvider,
3197 base_url: String,
3198 client: reqwest::blocking::Client,
3199 }
3200
3201 impl HttpDataUploadProvider {
3202 fn new(base_url: String) -> Self {
3203 Self {
3204 inner: NativeFsProvider,
3205 base_url,
3206 client: reqwest::blocking::Client::new(),
3207 }
3208 }
3209 }
3210
3211 #[async_trait(?Send)]
3212 impl FsProvider for HttpDataUploadProvider {
3213 fn open(&self, path: &Path, flags: &OpenFlags) -> std::io::Result<Box<dyn FileHandle>> {
3214 self.inner.open(path, flags)
3215 }
3216
3217 async fn read(&self, path: &Path) -> std::io::Result<Vec<u8>> {
3218 self.inner.read(path).await
3219 }
3220
3221 async fn write(&self, path: &Path, data: &[u8]) -> std::io::Result<()> {
3222 self.inner.write(path, data).await
3223 }
3224
3225 async fn remove_file(&self, path: &Path) -> std::io::Result<()> {
3226 self.inner.remove_file(path).await
3227 }
3228
3229 async fn metadata(&self, path: &Path) -> std::io::Result<FsMetadata> {
3230 self.inner.metadata(path).await
3231 }
3232
3233 async fn symlink_metadata(&self, path: &Path) -> std::io::Result<FsMetadata> {
3234 self.inner.symlink_metadata(path).await
3235 }
3236
3237 async fn read_dir(&self, path: &Path) -> std::io::Result<Vec<DirEntry>> {
3238 self.inner.read_dir(path).await
3239 }
3240
3241 async fn canonicalize(&self, path: &Path) -> std::io::Result<std::path::PathBuf> {
3242 self.inner.canonicalize(path).await
3243 }
3244
3245 async fn create_dir(&self, path: &Path) -> std::io::Result<()> {
3246 self.inner.create_dir(path).await
3247 }
3248
3249 async fn create_dir_all(&self, path: &Path) -> std::io::Result<()> {
3250 self.inner.create_dir_all(path).await
3251 }
3252
3253 async fn remove_dir(&self, path: &Path) -> std::io::Result<()> {
3254 self.inner.remove_dir(path).await
3255 }
3256
3257 async fn remove_dir_all(&self, path: &Path) -> std::io::Result<()> {
3258 self.inner.remove_dir_all(path).await
3259 }
3260
3261 async fn rename(&self, from: &Path, to: &Path) -> std::io::Result<()> {
3262 self.inner.rename(from, to).await
3263 }
3264
3265 async fn set_readonly(&self, path: &Path, readonly: bool) -> std::io::Result<()> {
3266 self.inner.set_readonly(path, readonly).await
3267 }
3268
3269 async fn data_manifest_descriptor(
3270 &self,
3271 request: &DataManifestRequest,
3272 ) -> std::io::Result<DataManifestDescriptor> {
3273 self.inner.data_manifest_descriptor(request).await
3274 }
3275
3276 async fn data_chunk_upload_targets(
3277 &self,
3278 request: &DataChunkUploadRequest,
3279 ) -> std::io::Result<Vec<DataChunkUploadTarget>> {
3280 #[derive(Deserialize)]
3281 struct UploadTargetsResponse {
3282 targets: Vec<DataChunkUploadTarget>,
3283 }
3284 let url = format!("{}/data/chunks/upload-targets", self.base_url);
3285 let response = self
3286 .client
3287 .post(url)
3288 .json(request)
3289 .send()
3290 .map_err(|err| std::io::Error::other(err.to_string()))?;
3291 if !response.status().is_success() {
3292 return Err(std::io::Error::other(format!(
3293 "upload targets request failed: {}",
3294 response.status()
3295 )));
3296 }
3297 let parsed: UploadTargetsResponse = response
3298 .json()
3299 .map_err(|err| std::io::Error::other(err.to_string()))?;
3300 Ok(parsed.targets)
3301 }
3302
3303 async fn data_upload_chunk(
3304 &self,
3305 target: &DataChunkUploadTarget,
3306 data: &[u8],
3307 ) -> std::io::Result<()> {
3308 let upload_url = if let Some(key) = target.upload_url.strip_prefix("upload://") {
3309 format!("{}/upload?key={}", self.base_url, key)
3310 } else {
3311 target.upload_url.clone()
3312 };
3313 let method = reqwest::Method::from_bytes(target.method.as_bytes())
3314 .map_err(|err| std::io::Error::other(err.to_string()))?;
3315 let mut request = self.client.request(method, &upload_url);
3316 for (k, v) in &target.headers {
3317 request = request.header(k, v);
3318 }
3319 let response = request
3320 .body(data.to_vec())
3321 .send()
3322 .map_err(|err| std::io::Error::other(err.to_string()))?;
3323 if !response.status().is_success() {
3324 return Err(std::io::Error::other(format!(
3325 "chunk upload failed: {}",
3326 response.status()
3327 )));
3328 }
3329 Ok(())
3330 }
3331 }
3332
3333 #[derive(Clone, Default)]
3334 struct UploadHarness {
3335 uploads: Arc<Mutex<Vec<String>>>,
3336 }
3337
3338 #[derive(Deserialize)]
3339 struct UploadChunkQuery {
3340 key: String,
3341 }
3342
3343 async fn upload_targets_handler(
3344 Json(req): Json<DataChunkUploadRequest>,
3345 ) -> Result<Json<serde_json::Value>, StatusCode> {
3346 let targets = req
3347 .chunks
3348 .iter()
3349 .map(|chunk| {
3350 serde_json::json!({
3351 "key": chunk.key,
3352 "method": "PUT",
3353 "upload_url": format!("upload://{}", chunk.key),
3354 "headers": {
3355 "x-runmat-hash": chunk.hash,
3356 }
3357 })
3358 })
3359 .collect::<Vec<_>>();
3360 Ok(Json(serde_json::json!({ "targets": targets })))
3361 }
3362
3363 async fn upload_handler(
3364 State(harness): State<UploadHarness>,
3365 Query(query): Query<UploadChunkQuery>,
3366 headers: HeaderMap,
3367 body: axum::body::Bytes,
3368 ) -> Result<(), StatusCode> {
3369 if body.is_empty() {
3370 return Err(StatusCode::BAD_REQUEST);
3371 }
3372 if headers.get("x-runmat-hash").is_none() {
3373 return Err(StatusCode::BAD_REQUEST);
3374 }
3375 let mut guard = harness.uploads.lock().expect("uploads lock poisoned");
3376 guard.push(query.key);
3377 Ok(())
3378 }
3379
3380 fn spawn_upload_server() -> (
3381 String,
3382 Arc<Mutex<Vec<String>>>,
3383 Runtime,
3384 oneshot::Sender<()>,
3385 ) {
3386 let harness = UploadHarness::default();
3387 let uploads = Arc::clone(&harness.uploads);
3388 let runtime = Runtime::new().expect("tokio runtime");
3389 let (addr, shutdown_tx) = runtime.block_on(async move {
3390 let listener = tokio::net::TcpListener::bind((std::net::Ipv4Addr::LOCALHOST, 0))
3391 .await
3392 .expect("bind upload server");
3393 let addr = listener.local_addr().expect("local addr");
3394 let app = Router::new()
3395 .route("/data/chunks/upload-targets", post(upload_targets_handler))
3396 .route("/upload", put(upload_handler))
3397 .with_state(harness);
3398 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
3399 let server = axum::serve(listener, app).with_graceful_shutdown(async {
3400 let _ = shutdown_rx.await;
3401 });
3402 tokio::spawn(async move {
3403 let _ = server.await;
3404 });
3405 (addr, shutdown_tx)
3406 });
3407 (format!("http://{}", addr), uploads, runtime, shutdown_tx)
3408 }
3409
3410 impl CountingDataUploadProvider {
3411 fn uploaded_keys(&self) -> Arc<Mutex<Vec<String>>> {
3412 Arc::clone(&self.uploaded_keys)
3413 }
3414 }
3415
3416 #[async_trait(?Send)]
3417 impl FsProvider for CountingDataUploadProvider {
3418 fn open(&self, path: &Path, flags: &OpenFlags) -> std::io::Result<Box<dyn FileHandle>> {
3419 self.inner.open(path, flags)
3420 }
3421
3422 async fn read(&self, path: &Path) -> std::io::Result<Vec<u8>> {
3423 self.inner.read(path).await
3424 }
3425
3426 async fn write(&self, path: &Path, data: &[u8]) -> std::io::Result<()> {
3427 self.inner.write(path, data).await
3428 }
3429
3430 async fn remove_file(&self, path: &Path) -> std::io::Result<()> {
3431 self.inner.remove_file(path).await
3432 }
3433
3434 async fn metadata(&self, path: &Path) -> std::io::Result<FsMetadata> {
3435 self.inner.metadata(path).await
3436 }
3437
3438 async fn symlink_metadata(&self, path: &Path) -> std::io::Result<FsMetadata> {
3439 self.inner.symlink_metadata(path).await
3440 }
3441
3442 async fn read_dir(&self, path: &Path) -> std::io::Result<Vec<DirEntry>> {
3443 self.inner.read_dir(path).await
3444 }
3445
3446 async fn canonicalize(&self, path: &Path) -> std::io::Result<std::path::PathBuf> {
3447 self.inner.canonicalize(path).await
3448 }
3449
3450 async fn create_dir(&self, path: &Path) -> std::io::Result<()> {
3451 self.inner.create_dir(path).await
3452 }
3453
3454 async fn create_dir_all(&self, path: &Path) -> std::io::Result<()> {
3455 self.inner.create_dir_all(path).await
3456 }
3457
3458 async fn remove_dir(&self, path: &Path) -> std::io::Result<()> {
3459 self.inner.remove_dir(path).await
3460 }
3461
3462 async fn remove_dir_all(&self, path: &Path) -> std::io::Result<()> {
3463 self.inner.remove_dir_all(path).await
3464 }
3465
3466 async fn rename(&self, from: &Path, to: &Path) -> std::io::Result<()> {
3467 self.inner.rename(from, to).await
3468 }
3469
3470 async fn set_readonly(&self, path: &Path, readonly: bool) -> std::io::Result<()> {
3471 self.inner.set_readonly(path, readonly).await
3472 }
3473
3474 async fn data_manifest_descriptor(
3475 &self,
3476 request: &DataManifestRequest,
3477 ) -> std::io::Result<DataManifestDescriptor> {
3478 self.inner.data_manifest_descriptor(request).await
3479 }
3480
3481 async fn data_chunk_upload_targets(
3482 &self,
3483 request: &DataChunkUploadRequest,
3484 ) -> std::io::Result<Vec<DataChunkUploadTarget>> {
3485 Ok(request
3486 .chunks
3487 .iter()
3488 .map(|chunk| DataChunkUploadTarget {
3489 key: chunk.key.clone(),
3490 method: "PUT".to_string(),
3491 upload_url: format!("count://{}", chunk.object_id),
3492 headers: std::collections::HashMap::new(),
3493 })
3494 .collect())
3495 }
3496
3497 async fn data_upload_chunk(
3498 &self,
3499 target: &DataChunkUploadTarget,
3500 _data: &[u8],
3501 ) -> std::io::Result<()> {
3502 let mut guard = match self.uploaded_keys.lock() {
3503 Ok(guard) => guard,
3504 Err(poisoned) => poisoned.into_inner(),
3505 };
3506 guard.push(target.key.clone());
3507 Ok(())
3508 }
3509 }
3510
3511 #[test]
3512 fn create_open_write_read_dataset() {
3513 let _serial = serial_test_guard();
3514 let _provider_guard = native_provider_guard();
3515 let dir = tempfile::tempdir().expect("tempdir");
3516 let path = dir.path().join("sample.data").to_string_lossy().to_string();
3517
3518 let mut array_meta = StructValue::new();
3519 array_meta
3520 .fields
3521 .insert("dtype".to_string(), Value::String("f64".to_string()));
3522 array_meta.fields.insert(
3523 "shape".to_string(),
3524 Value::Tensor(Tensor::new(vec![2.0, 2.0], vec![1, 2]).expect("shape tensor")),
3525 );
3526 let mut arrays = StructValue::new();
3527 arrays
3528 .fields
3529 .insert("temperature".to_string(), Value::Struct(array_meta));
3530 let mut schema = StructValue::new();
3531 schema
3532 .fields
3533 .insert("arrays".to_string(), Value::Struct(arrays));
3534
3535 let ds = call_builtin(
3536 "data.create",
3537 &[
3538 Value::String(path.clone()),
3539 Value::Struct(schema),
3540 Value::Cell(runmat_builtins::CellArray::new(vec![], 1, 0).expect("cell")),
3541 ],
3542 )
3543 .expect("create dataset");
3544
3545 let arr = call_builtin(
3546 "Dataset.array",
3547 &[ds, Value::String("temperature".to_string())],
3548 )
3549 .expect("dataset array");
3550 let write_tensor = Tensor::new(vec![1.0, 2.0, 3.0, 4.0], vec![2, 2]).expect("write tensor");
3551 call_builtin(
3552 "DataArray.write",
3553 &[arr.clone(), Value::Tensor(write_tensor)],
3554 )
3555 .expect("write array");
3556
3557 let read_back = call_builtin("DataArray.read", &[arr]).expect("read array");
3558 let Value::Tensor(t) = read_back else {
3559 panic!("expected tensor");
3560 };
3561 assert_eq!(t.shape, vec![2, 2]);
3562 assert_eq!(t.data, vec![1.0, 2.0, 3.0, 4.0]);
3563 }
3564
3565 #[test]
3566 fn write_and_read_slice_payload() {
3567 let _serial = serial_test_guard();
3568 let _provider_guard = native_provider_guard();
3569 let dir = tempfile::tempdir().expect("tempdir");
3570 let path = dir.path().join("slice.data").to_string_lossy().to_string();
3571
3572 let mut array_meta = StructValue::new();
3573 array_meta
3574 .fields
3575 .insert("dtype".to_string(), Value::String("f64".to_string()));
3576 array_meta.fields.insert(
3577 "shape".to_string(),
3578 Value::Tensor(Tensor::new(vec![3.0, 3.0], vec![1, 2]).expect("shape tensor")),
3579 );
3580 let mut arrays = StructValue::new();
3581 arrays
3582 .fields
3583 .insert("temperature".to_string(), Value::Struct(array_meta));
3584 let mut schema = StructValue::new();
3585 schema
3586 .fields
3587 .insert("arrays".to_string(), Value::Struct(arrays));
3588
3589 let ds = call_builtin(
3590 "data.create",
3591 &[
3592 Value::String(path.clone()),
3593 Value::Struct(schema),
3594 Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
3595 ],
3596 )
3597 .expect("create dataset");
3598 let arr = call_builtin(
3599 "Dataset.array",
3600 &[ds, Value::String("temperature".to_string())],
3601 )
3602 .expect("dataset array");
3603
3604 let slice = Value::Cell(
3605 CellArray::new(
3606 vec![
3607 Value::Tensor(Tensor::new(vec![1.0, 2.0], vec![1, 2]).expect("range")),
3608 Value::String(":".to_string()),
3609 ],
3610 1,
3611 2,
3612 )
3613 .expect("slice cell"),
3614 );
3615 let rhs = Value::Tensor(
3616 Tensor::new(vec![10.0, 11.0, 12.0, 13.0, 14.0, 15.0], vec![2, 3]).expect("rhs"),
3617 );
3618 call_builtin("DataArray.write", &[arr.clone(), slice.clone(), rhs]).expect("slice write");
3619
3620 let read_back = call_builtin("DataArray.read", &[arr.clone(), slice]).expect("slice read");
3621 let Value::Tensor(t) = read_back else {
3622 panic!("expected tensor");
3623 };
3624 assert_eq!(t.shape, vec![2, 3]);
3625 assert_eq!(t.data, vec![10.0, 11.0, 12.0, 13.0, 14.0, 15.0]);
3626 }
3627
3628 #[test]
3629 fn slice_write_updates_only_touched_chunks() {
3630 let _serial = serial_test_guard();
3631 let _provider_guard = native_provider_guard();
3632 let dir = tempfile::tempdir().expect("tempdir");
3633 let path = dir
3634 .path()
3635 .join("chunked.data")
3636 .to_string_lossy()
3637 .to_string();
3638
3639 let mut array_meta = StructValue::new();
3640 array_meta
3641 .fields
3642 .insert("dtype".to_string(), Value::String("f64".to_string()));
3643 array_meta.fields.insert(
3644 "shape".to_string(),
3645 Value::Tensor(Tensor::new(vec![4.0, 4.0], vec![1, 2]).expect("shape tensor")),
3646 );
3647 array_meta.fields.insert(
3648 "chunk".to_string(),
3649 Value::Tensor(Tensor::new(vec![2.0, 2.0], vec![1, 2]).expect("chunk tensor")),
3650 );
3651 let mut arrays = StructValue::new();
3652 arrays
3653 .fields
3654 .insert("temperature".to_string(), Value::Struct(array_meta));
3655 let mut schema = StructValue::new();
3656 schema
3657 .fields
3658 .insert("arrays".to_string(), Value::Struct(arrays));
3659
3660 let ds = call_builtin(
3661 "data.create",
3662 &[
3663 Value::String(path.clone()),
3664 Value::Struct(schema),
3665 Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
3666 ],
3667 )
3668 .expect("create dataset");
3669 let arr = call_builtin(
3670 "Dataset.array",
3671 &[ds, Value::String("temperature".to_string())],
3672 )
3673 .expect("dataset array");
3674
3675 let full = Value::Tensor(
3676 Tensor::new((1..=16).map(|v| v as f64).collect(), vec![4, 4]).expect("full tensor"),
3677 );
3678 call_builtin("DataArray.write", &[arr.clone(), full]).expect("initial write");
3679
3680 let root = std::path::PathBuf::from(&path);
3681 let untouched_path = root.join("arrays/temperature/chunks/obj_1_1.json");
3682 let touched_path = root.join("arrays/temperature/chunks/obj_0_0.json");
3683 let untouched_before =
3684 futures::executor::block_on(runmat_filesystem::read_async(&untouched_path))
3685 .expect("read untouched before");
3686 let touched_before =
3687 futures::executor::block_on(runmat_filesystem::read_async(&touched_path))
3688 .expect("read touched before");
3689
3690 let slice = Value::Cell(
3691 CellArray::new(
3692 vec![
3693 Value::Tensor(Tensor::new(vec![1.0, 2.0], vec![1, 2]).expect("range")),
3694 Value::Tensor(Tensor::new(vec![1.0, 2.0], vec![1, 2]).expect("range")),
3695 ],
3696 1,
3697 2,
3698 )
3699 .expect("slice cell"),
3700 );
3701 let rhs =
3702 Value::Tensor(Tensor::new(vec![99.0, 98.0, 97.0, 96.0], vec![2, 2]).expect("rhs"));
3703 call_builtin("DataArray.write", &[arr.clone(), slice, rhs]).expect("slice write");
3704
3705 let untouched_after =
3706 futures::executor::block_on(runmat_filesystem::read_async(&untouched_path))
3707 .expect("read untouched after");
3708 let touched_after =
3709 futures::executor::block_on(runmat_filesystem::read_async(&touched_path))
3710 .expect("read touched after");
3711 assert_eq!(untouched_before, untouched_after);
3712 assert_ne!(touched_before, touched_after);
3713 }
3714
3715 #[test]
3716 fn slice_write_uploads_only_touched_chunk_targets() {
3717 let _serial = serial_test_guard();
3718 let provider = Arc::new(CountingDataUploadProvider::default());
3719 let uploaded = provider.uploaded_keys();
3720 let _guard = runmat_filesystem::replace_provider(provider);
3721
3722 let dir = tempfile::tempdir().expect("tempdir");
3723 let path = dir
3724 .path()
3725 .join("remote-chunked.data")
3726 .to_string_lossy()
3727 .to_string();
3728
3729 let mut array_meta = StructValue::new();
3730 array_meta
3731 .fields
3732 .insert("dtype".to_string(), Value::String("f64".to_string()));
3733 array_meta.fields.insert(
3734 "shape".to_string(),
3735 Value::Tensor(Tensor::new(vec![4.0, 4.0], vec![1, 2]).expect("shape tensor")),
3736 );
3737 array_meta.fields.insert(
3738 "chunk".to_string(),
3739 Value::Tensor(Tensor::new(vec![2.0, 2.0], vec![1, 2]).expect("chunk tensor")),
3740 );
3741 let mut arrays = StructValue::new();
3742 arrays
3743 .fields
3744 .insert("temperature".to_string(), Value::Struct(array_meta));
3745 let mut schema = StructValue::new();
3746 schema
3747 .fields
3748 .insert("arrays".to_string(), Value::Struct(arrays));
3749
3750 let ds = call_builtin(
3751 "data.create",
3752 &[
3753 Value::String(path.clone()),
3754 Value::Struct(schema),
3755 Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
3756 ],
3757 )
3758 .expect("create dataset");
3759 let arr = call_builtin(
3760 "Dataset.array",
3761 &[ds, Value::String("temperature".to_string())],
3762 )
3763 .expect("dataset array");
3764
3765 call_builtin(
3766 "DataArray.write",
3767 &[
3768 arr.clone(),
3769 Value::Tensor(
3770 Tensor::new((1..=16).map(|v| v as f64).collect(), vec![4, 4])
3771 .expect("full tensor"),
3772 ),
3773 ],
3774 )
3775 .expect("initial write");
3776
3777 let manifest =
3778 futures::executor::block_on(crate::data::read_manifest_async(&dataset_root(&path)))
3779 .expect("manifest after initial write");
3780 let meta = manifest
3781 .arrays
3782 .get("temperature")
3783 .expect("temperature meta");
3784 let chunk_index_path =
3785 dataset_root(&path).join(meta.chunk_index_path.clone().expect("chunk index path"));
3786 assert!(
3787 futures::executor::block_on(runmat_filesystem::metadata_async(&chunk_index_path))
3788 .is_ok()
3789 );
3790
3791 {
3792 let mut keys = uploaded.lock().expect("uploaded keys lock");
3793 keys.clear();
3794 }
3795
3796 let slice = Value::Cell(
3797 CellArray::new(
3798 vec![
3799 Value::Tensor(Tensor::new(vec![1.0, 2.0], vec![1, 2]).expect("range")),
3800 Value::Tensor(Tensor::new(vec![1.0, 2.0], vec![1, 2]).expect("range")),
3801 ],
3802 1,
3803 2,
3804 )
3805 .expect("slice cell"),
3806 );
3807 let rhs = Value::Tensor(Tensor::new(vec![9.0, 8.0, 7.0, 6.0], vec![2, 2]).expect("rhs"));
3808 call_builtin("DataArray.write", &[arr, slice, rhs]).expect("slice write");
3809
3810 let keys = uploaded.lock().expect("uploaded keys lock");
3811 assert_eq!(keys.as_slice(), ["0.0".to_string()].as_slice());
3812 }
3813
3814 #[test]
3815 fn slice_write_uploads_expected_cross_boundary_chunk_targets() {
3816 let _serial = serial_test_guard();
3817 let provider = Arc::new(CountingDataUploadProvider::default());
3818 let uploaded = provider.uploaded_keys();
3819 let _guard = runmat_filesystem::replace_provider(provider);
3820
3821 let dir = tempfile::tempdir().expect("tempdir");
3822 let path = dir
3823 .path()
3824 .join("remote-chunked-boundary.data")
3825 .to_string_lossy()
3826 .to_string();
3827
3828 let mut array_meta = StructValue::new();
3829 array_meta
3830 .fields
3831 .insert("dtype".to_string(), Value::String("f64".to_string()));
3832 array_meta.fields.insert(
3833 "shape".to_string(),
3834 Value::Tensor(Tensor::new(vec![4.0, 4.0], vec![1, 2]).expect("shape tensor")),
3835 );
3836 array_meta.fields.insert(
3837 "chunk".to_string(),
3838 Value::Tensor(Tensor::new(vec![2.0, 2.0], vec![1, 2]).expect("chunk tensor")),
3839 );
3840 let mut arrays = StructValue::new();
3841 arrays
3842 .fields
3843 .insert("temperature".to_string(), Value::Struct(array_meta));
3844 let mut schema = StructValue::new();
3845 schema
3846 .fields
3847 .insert("arrays".to_string(), Value::Struct(arrays));
3848
3849 let ds = call_builtin(
3850 "data.create",
3851 &[
3852 Value::String(path.clone()),
3853 Value::Struct(schema),
3854 Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
3855 ],
3856 )
3857 .expect("create dataset");
3858 let arr = call_builtin(
3859 "Dataset.array",
3860 &[ds, Value::String("temperature".to_string())],
3861 )
3862 .expect("dataset array");
3863
3864 call_builtin(
3865 "DataArray.write",
3866 &[
3867 arr.clone(),
3868 Value::Tensor(
3869 Tensor::new((1..=16).map(|v| v as f64).collect(), vec![4, 4])
3870 .expect("full tensor"),
3871 ),
3872 ],
3873 )
3874 .expect("initial write");
3875 {
3876 let mut keys = uploaded.lock().expect("uploaded keys lock");
3877 keys.clear();
3878 }
3879
3880 let slice = Value::Cell(
3881 CellArray::new(
3882 vec![
3883 Value::Tensor(Tensor::new(vec![2.0, 3.0], vec![1, 2]).expect("range")),
3884 Value::Tensor(Tensor::new(vec![2.0, 3.0], vec![1, 2]).expect("range")),
3885 ],
3886 1,
3887 2,
3888 )
3889 .expect("slice cell"),
3890 );
3891 let rhs =
3892 Value::Tensor(Tensor::new(vec![19.0, 18.0, 17.0, 16.0], vec![2, 2]).expect("rhs"));
3893 call_builtin("DataArray.write", &[arr, slice, rhs]).expect("slice write");
3894
3895 let mut keys = uploaded.lock().expect("uploaded keys lock").clone();
3896 keys.sort();
3897 keys.dedup();
3898 assert_eq!(
3899 keys.as_slice(),
3900 [
3901 "0.0".to_string(),
3902 "0.1".to_string(),
3903 "1.0".to_string(),
3904 "1.1".to_string(),
3905 ]
3906 .as_slice()
3907 );
3908 }
3909
3910 #[test]
3911 fn slice_write_hits_http_server_data_endpoints_with_expected_keys() {
3912 let _serial = serial_test_guard();
3913 let (base_url, uploads, runtime, shutdown_tx) = spawn_upload_server();
3914 let provider = Arc::new(HttpDataUploadProvider::new(base_url));
3915 let _guard = runmat_filesystem::replace_provider(provider);
3916
3917 let dir = tempfile::tempdir().expect("tempdir");
3918 let path = dir
3919 .path()
3920 .join("http-endpoint.data")
3921 .to_string_lossy()
3922 .to_string();
3923
3924 let mut array_meta = StructValue::new();
3925 array_meta
3926 .fields
3927 .insert("dtype".to_string(), Value::String("f64".to_string()));
3928 array_meta.fields.insert(
3929 "shape".to_string(),
3930 Value::Tensor(Tensor::new(vec![4.0, 4.0], vec![1, 2]).expect("shape tensor")),
3931 );
3932 array_meta.fields.insert(
3933 "chunk".to_string(),
3934 Value::Tensor(Tensor::new(vec![2.0, 2.0], vec![1, 2]).expect("chunk tensor")),
3935 );
3936 let mut arrays = StructValue::new();
3937 arrays
3938 .fields
3939 .insert("temperature".to_string(), Value::Struct(array_meta));
3940 let mut schema = StructValue::new();
3941 schema
3942 .fields
3943 .insert("arrays".to_string(), Value::Struct(arrays));
3944
3945 let ds = call_builtin(
3946 "data.create",
3947 &[
3948 Value::String(path.clone()),
3949 Value::Struct(schema),
3950 Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
3951 ],
3952 )
3953 .expect("create dataset");
3954 let arr = call_builtin(
3955 "Dataset.array",
3956 &[ds, Value::String("temperature".to_string())],
3957 )
3958 .expect("dataset array");
3959
3960 call_builtin(
3961 "DataArray.write",
3962 &[
3963 arr.clone(),
3964 Value::Tensor(
3965 Tensor::new((1..=16).map(|v| v as f64).collect(), vec![4, 4])
3966 .expect("full tensor"),
3967 ),
3968 ],
3969 )
3970 .expect("initial write");
3971
3972 {
3973 let mut keys = uploads.lock().expect("uploads lock");
3974 keys.clear();
3975 }
3976
3977 let slice = Value::Cell(
3978 CellArray::new(
3979 vec![
3980 Value::Tensor(Tensor::new(vec![2.0, 3.0], vec![1, 2]).expect("range")),
3981 Value::Tensor(Tensor::new(vec![2.0, 3.0], vec![1, 2]).expect("range")),
3982 ],
3983 1,
3984 2,
3985 )
3986 .expect("slice cell"),
3987 );
3988 let rhs =
3989 Value::Tensor(Tensor::new(vec![19.0, 18.0, 17.0, 16.0], vec![2, 2]).expect("rhs"));
3990 call_builtin("DataArray.write", &[arr, slice, rhs]).expect("slice write");
3991
3992 let mut keys = uploads.lock().expect("uploads lock").clone();
3993 keys.sort();
3994 keys.dedup();
3995 assert_eq!(
3996 keys.as_slice(),
3997 [
3998 "0.0".to_string(),
3999 "0.1".to_string(),
4000 "1.0".to_string(),
4001 "1.1".to_string(),
4002 ]
4003 .as_slice()
4004 );
4005
4006 let _ = shutdown_tx.send(());
4007 drop(runtime);
4008 }
4009
4010 #[test]
4011 fn tx_create_resize_fill_and_delete_array() {
4012 let _serial = serial_test_guard();
4013 let dir = tempfile::tempdir().expect("tempdir");
4014 let path = dir.path().join("tx-ops.data").to_string_lossy().to_string();
4015
4016 let mut arrays = StructValue::new();
4017 let mut array_meta = StructValue::new();
4018 array_meta
4019 .fields
4020 .insert("dtype".to_string(), Value::String("f64".to_string()));
4021 array_meta.fields.insert(
4022 "shape".to_string(),
4023 Value::Tensor(Tensor::new(vec![1.0, 1.0], vec![1, 2]).expect("shape tensor")),
4024 );
4025 arrays
4026 .fields
4027 .insert("base".to_string(), Value::Struct(array_meta));
4028 let mut schema = StructValue::new();
4029 schema
4030 .fields
4031 .insert("arrays".to_string(), Value::Struct(arrays));
4032
4033 let ds = call_builtin(
4034 "data.create",
4035 &[
4036 Value::String(path.clone()),
4037 Value::Struct(schema),
4038 Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
4039 ],
4040 )
4041 .expect("create dataset");
4042
4043 let tx = call_builtin("Dataset.begin", &[ds]).expect("begin tx");
4044 let mut new_meta = StructValue::new();
4045 new_meta
4046 .fields
4047 .insert("dtype".to_string(), Value::String("f64".to_string()));
4048 new_meta.fields.insert(
4049 "shape".to_string(),
4050 Value::Tensor(Tensor::new(vec![2.0, 2.0], vec![1, 2]).expect("shape tensor")),
4051 );
4052 call_builtin(
4053 "DataTransaction.create_array",
4054 &[
4055 tx.clone(),
4056 Value::String("new_array".to_string()),
4057 Value::Struct(new_meta),
4058 ],
4059 )
4060 .expect("create array in tx");
4061 call_builtin(
4062 "DataTransaction.resize",
4063 &[
4064 tx.clone(),
4065 Value::String("new_array".to_string()),
4066 Value::Tensor(Tensor::new(vec![3.0, 1.0], vec![1, 2]).expect("shape tensor")),
4067 ],
4068 )
4069 .expect("resize array in tx");
4070 call_builtin(
4071 "DataTransaction.fill",
4072 &[
4073 tx.clone(),
4074 Value::String("new_array".to_string()),
4075 Value::Num(7.0),
4076 ],
4077 )
4078 .expect("fill array in tx");
4079 call_builtin(
4080 "DataTransaction.delete_array",
4081 &[tx.clone(), Value::String("base".to_string())],
4082 )
4083 .expect("delete array in tx");
4084 call_builtin("DataTransaction.commit", &[tx]).expect("commit tx");
4085
4086 let ds = call_builtin(
4087 "data.open",
4088 &[
4089 Value::String(path),
4090 Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
4091 ],
4092 )
4093 .expect("open dataset");
4094 let has_base = call_builtin(
4095 "Dataset.has_array",
4096 &[ds.clone(), Value::String("base".to_string())],
4097 )
4098 .expect("has base");
4099 assert_eq!(has_base, Value::Bool(false));
4100 let arr = call_builtin(
4101 "Dataset.array",
4102 &[ds, Value::String("new_array".to_string())],
4103 )
4104 .expect("new array");
4105 let read_back = call_builtin("DataArray.read", &[arr]).expect("read array");
4106 let Value::Tensor(t) = read_back else {
4107 panic!("expected tensor");
4108 };
4109 assert_eq!(t.shape, vec![3, 1]);
4110 assert_eq!(t.data, vec![7.0, 7.0, 7.0]);
4111 }
4112}