1use std::collections::BTreeMap;
4use std::collections::HashMap;
5use std::path::PathBuf;
6
7use runmat_builtins::{
8 BuiltinCompletionPolicy, BuiltinDescriptor, BuiltinErrorDescriptor, BuiltinOutputMode,
9 BuiltinParamArity, BuiltinParamDescriptor, BuiltinParamType, BuiltinSignatureDescriptor,
10 ObjectInstance, StructValue, Tensor, Value,
11};
12use runmat_filesystem::data_contract::{DataChunkDescriptor, DataChunkUploadRequest};
13use runmat_macros::runtime_builtin;
14
15use crate::builtins::common::spec::{
16 BroadcastSemantics, BuiltinFusionSpec, BuiltinGpuSpec, ConstantStrategy, GpuOpKind,
17 ReductionNaN, ResidencyPolicy, ShapeRequirements,
18};
19use crate::data::{
20 array_object, data_error, dataset_object, dataset_root, ensure_manifest_sequence,
21 get_object_prop, manifest_path, manifest_version_token, now_rfc3339, parse_schema,
22 parse_string, read_array_payload_async, read_manifest_async, remove_tx, sha256_hex, start_tx,
23 transaction_object, with_tx, with_tx_mut, write_array_payload_async, write_manifest_async,
24 DataArrayMeta, DataArrayPayload, DataChunkIndex, DataChunkIndexEntry, DataManifest,
25 PendingCreateArray, PendingFill, PendingResize, PendingWrite, TxnStatus,
26};
27use crate::{make_cell, BuiltinResult};
28
29#[runmat_macros::register_gpu_spec(builtin_path = "crate::builtins::io::data")]
30pub const GPU_SPEC: BuiltinGpuSpec = BuiltinGpuSpec {
31 name: "data.*",
32 op_kind: GpuOpKind::Custom("io-data"),
33 supported_precisions: &[],
34 broadcast: BroadcastSemantics::None,
35 provider_hooks: &[],
36 constant_strategy: ConstantStrategy::InlineLiteral,
37 residency: ResidencyPolicy::GatherImmediately,
38 nan_mode: ReductionNaN::Include,
39 two_pass_threshold: None,
40 workgroup_size: None,
41 accepts_nan_mode: false,
42 notes: "Dataset operations are host I/O and metadata orchestration.",
43};
44
45#[runmat_macros::register_fusion_spec(builtin_path = "crate::builtins::io::data")]
46pub const FUSION_SPEC: BuiltinFusionSpec = BuiltinFusionSpec {
47 name: "data.*",
48 shape: ShapeRequirements::Any,
49 constant_strategy: ConstantStrategy::InlineLiteral,
50 elementwise: None,
51 reduction: None,
52 emits_nan: false,
53 notes: "Data builtins are side-effecting and not fusible.",
54};
55
56const DATA_ERROR_INVALID_ARGUMENT: BuiltinErrorDescriptor = BuiltinErrorDescriptor {
57 code: "RM.DATA.INVALID_ARGUMENT",
58 identifier: Some("RunMat:data:InvalidArgument"),
59 when: "Arguments, receiver object, or option grammar are invalid for the requested data API.",
60 message: "data: invalid argument",
61};
62
63const DATA_ERROR_NOT_FOUND: BuiltinErrorDescriptor = BuiltinErrorDescriptor {
64 code: "RM.DATA.NOT_FOUND",
65 identifier: Some("RunMat:data:NotFound"),
66 when: "Referenced dataset/array/transaction object is missing or cannot be resolved.",
67 message: "data: requested object not found",
68};
69
70const DATA_ERROR_INTERNAL: BuiltinErrorDescriptor = BuiltinErrorDescriptor {
71 code: "RM.DATA.INTERNAL",
72 identifier: Some("RunMat:data:Internal"),
73 when: "Filesystem/manifest/chunk processing fails unexpectedly during data API execution.",
74 message: "data: internal operation failed",
75};
76
77const DATA_DESCRIPTOR_ERRORS: [BuiltinErrorDescriptor; 3] = [
78 DATA_ERROR_INVALID_ARGUMENT,
79 DATA_ERROR_NOT_FOUND,
80 DATA_ERROR_INTERNAL,
81];
82
83const OUT_DATASET: [BuiltinParamDescriptor; 1] = [BuiltinParamDescriptor {
84 name: "ds",
85 ty: BuiltinParamType::Any,
86 arity: BuiltinParamArity::Required,
87 default: None,
88 description: "Dataset handle object.",
89}];
90
91const OUT_ARRAY: [BuiltinParamDescriptor; 1] = [BuiltinParamDescriptor {
92 name: "arr",
93 ty: BuiltinParamType::Any,
94 arity: BuiltinParamArity::Required,
95 default: None,
96 description: "DataArray handle object.",
97}];
98
99const OUT_TX: [BuiltinParamDescriptor; 1] = [BuiltinParamDescriptor {
100 name: "tx",
101 ty: BuiltinParamType::Any,
102 arity: BuiltinParamArity::Required,
103 default: None,
104 description: "DataTransaction handle object.",
105}];
106
107const OUT_BOOL: [BuiltinParamDescriptor; 1] = [BuiltinParamDescriptor {
108 name: "ok",
109 ty: BuiltinParamType::LogicalArray,
110 arity: BuiltinParamArity::Required,
111 default: None,
112 description: "Logical success/result flag.",
113}];
114
115const OUT_STRING: [BuiltinParamDescriptor; 1] = [BuiltinParamDescriptor {
116 name: "s",
117 ty: BuiltinParamType::StringScalar,
118 arity: BuiltinParamArity::Required,
119 default: None,
120 description: "String scalar result.",
121}];
122
123const OUT_STRUCT: [BuiltinParamDescriptor; 1] = [BuiltinParamDescriptor {
124 name: "S",
125 ty: BuiltinParamType::Any,
126 arity: BuiltinParamArity::Required,
127 default: None,
128 description: "Struct result.",
129}];
130
131const OUT_CELL: [BuiltinParamDescriptor; 1] = [BuiltinParamDescriptor {
132 name: "C",
133 ty: BuiltinParamType::Any,
134 arity: BuiltinParamArity::Required,
135 default: None,
136 description: "Cell-array result.",
137}];
138
139const OUT_VALUE: [BuiltinParamDescriptor; 1] = [BuiltinParamDescriptor {
140 name: "value",
141 ty: BuiltinParamType::Any,
142 arity: BuiltinParamArity::Required,
143 default: None,
144 description: "Value result.",
145}];
146
147const OUT_TENSOR: [BuiltinParamDescriptor; 1] = [BuiltinParamDescriptor {
148 name: "X",
149 ty: BuiltinParamType::NumericArray,
150 arity: BuiltinParamArity::Required,
151 default: None,
152 description: "Numeric tensor result.",
153}];
154
155const IN_PATH_SCHEMA_REST: [BuiltinParamDescriptor; 3] = [
156 BuiltinParamDescriptor {
157 name: "path",
158 ty: BuiltinParamType::StringScalar,
159 arity: BuiltinParamArity::Required,
160 default: None,
161 description: "Dataset path (.data).",
162 },
163 BuiltinParamDescriptor {
164 name: "schema",
165 ty: BuiltinParamType::Any,
166 arity: BuiltinParamArity::Required,
167 default: None,
168 description: "Dataset schema struct.",
169 },
170 BuiltinParamDescriptor {
171 name: "options",
172 ty: BuiltinParamType::Any,
173 arity: BuiltinParamArity::Variadic,
174 default: None,
175 description: "Reserved name/value options.",
176 },
177];
178
179const IN_PATH_REST: [BuiltinParamDescriptor; 2] = [
180 BuiltinParamDescriptor {
181 name: "path",
182 ty: BuiltinParamType::StringScalar,
183 arity: BuiltinParamArity::Required,
184 default: None,
185 description: "Dataset path (.data).",
186 },
187 BuiltinParamDescriptor {
188 name: "options",
189 ty: BuiltinParamType::Any,
190 arity: BuiltinParamArity::Variadic,
191 default: None,
192 description: "Reserved name/value options.",
193 },
194];
195
196const IN_FROM_TO_REST: [BuiltinParamDescriptor; 3] = [
197 BuiltinParamDescriptor {
198 name: "fromPath",
199 ty: BuiltinParamType::StringScalar,
200 arity: BuiltinParamArity::Required,
201 default: None,
202 description: "Source dataset path.",
203 },
204 BuiltinParamDescriptor {
205 name: "toPath",
206 ty: BuiltinParamType::StringScalar,
207 arity: BuiltinParamArity::Required,
208 default: None,
209 description: "Destination dataset path.",
210 },
211 BuiltinParamDescriptor {
212 name: "options",
213 ty: BuiltinParamType::Any,
214 arity: BuiltinParamArity::Variadic,
215 default: None,
216 description: "Reserved name/value options.",
217 },
218];
219
220const IN_PATH_FORMAT_TARGET_REST: [BuiltinParamDescriptor; 4] = [
221 BuiltinParamDescriptor {
222 name: "path",
223 ty: BuiltinParamType::StringScalar,
224 arity: BuiltinParamArity::Required,
225 default: None,
226 description: "Dataset path (.data).",
227 },
228 BuiltinParamDescriptor {
229 name: "format",
230 ty: BuiltinParamType::StringScalar,
231 arity: BuiltinParamArity::Required,
232 default: None,
233 description: "Format token (currently 'data').",
234 },
235 BuiltinParamDescriptor {
236 name: "targetPath",
237 ty: BuiltinParamType::StringScalar,
238 arity: BuiltinParamArity::Required,
239 default: None,
240 description: "Target dataset path.",
241 },
242 BuiltinParamDescriptor {
243 name: "options",
244 ty: BuiltinParamType::Any,
245 arity: BuiltinParamArity::Variadic,
246 default: None,
247 description: "Reserved name/value options.",
248 },
249];
250
251const IN_PREFIX_REST: [BuiltinParamDescriptor; 2] = [
252 BuiltinParamDescriptor {
253 name: "prefix",
254 ty: BuiltinParamType::StringScalar,
255 arity: BuiltinParamArity::Required,
256 default: None,
257 description: "Filesystem prefix to scan for datasets.",
258 },
259 BuiltinParamDescriptor {
260 name: "options",
261 ty: BuiltinParamType::Any,
262 arity: BuiltinParamArity::Variadic,
263 default: None,
264 description: "Reserved name/value options.",
265 },
266];
267
268const IN_BASE: [BuiltinParamDescriptor; 1] = [BuiltinParamDescriptor {
269 name: "obj",
270 ty: BuiltinParamType::Any,
271 arity: BuiltinParamArity::Required,
272 default: None,
273 description: "Receiver object.",
274}];
275
276const IN_BASE_NAME: [BuiltinParamDescriptor; 2] = [
277 BuiltinParamDescriptor {
278 name: "obj",
279 ty: BuiltinParamType::Any,
280 arity: BuiltinParamArity::Required,
281 default: None,
282 description: "Receiver object.",
283 },
284 BuiltinParamDescriptor {
285 name: "name",
286 ty: BuiltinParamType::StringScalar,
287 arity: BuiltinParamArity::Required,
288 default: None,
289 description: "Name key/array identifier.",
290 },
291];
292
293const IN_BASE_KEY_DEFAULT: [BuiltinParamDescriptor; 3] = [
294 BuiltinParamDescriptor {
295 name: "obj",
296 ty: BuiltinParamType::Any,
297 arity: BuiltinParamArity::Required,
298 default: None,
299 description: "Receiver object.",
300 },
301 BuiltinParamDescriptor {
302 name: "key",
303 ty: BuiltinParamType::StringScalar,
304 arity: BuiltinParamArity::Required,
305 default: None,
306 description: "Attribute key.",
307 },
308 BuiltinParamDescriptor {
309 name: "defaultValue",
310 ty: BuiltinParamType::Any,
311 arity: BuiltinParamArity::Optional,
312 default: Some("0"),
313 description: "Fallback value when key is absent.",
314 },
315];
316
317const IN_BASE_KEY_VALUE: [BuiltinParamDescriptor; 3] = [
318 BuiltinParamDescriptor {
319 name: "obj",
320 ty: BuiltinParamType::Any,
321 arity: BuiltinParamArity::Required,
322 default: None,
323 description: "Receiver object.",
324 },
325 BuiltinParamDescriptor {
326 name: "key",
327 ty: BuiltinParamType::StringScalar,
328 arity: BuiltinParamArity::Required,
329 default: None,
330 description: "Attribute key.",
331 },
332 BuiltinParamDescriptor {
333 name: "value",
334 ty: BuiltinParamType::Any,
335 arity: BuiltinParamArity::Required,
336 default: None,
337 description: "Attribute value.",
338 },
339];
340
341const IN_BASE_ATTRS: [BuiltinParamDescriptor; 2] = [
342 BuiltinParamDescriptor {
343 name: "obj",
344 ty: BuiltinParamType::Any,
345 arity: BuiltinParamArity::Required,
346 default: None,
347 description: "Receiver object.",
348 },
349 BuiltinParamDescriptor {
350 name: "attrs",
351 ty: BuiltinParamType::Any,
352 arity: BuiltinParamArity::Required,
353 default: None,
354 description: "Struct of attribute updates.",
355 },
356];
357
358const IN_BASE_LABEL_REST: [BuiltinParamDescriptor; 3] = [
359 BuiltinParamDescriptor {
360 name: "obj",
361 ty: BuiltinParamType::Any,
362 arity: BuiltinParamArity::Required,
363 default: None,
364 description: "Receiver object.",
365 },
366 BuiltinParamDescriptor {
367 name: "label",
368 ty: BuiltinParamType::StringScalar,
369 arity: BuiltinParamArity::Required,
370 default: None,
371 description: "Snapshot label.",
372 },
373 BuiltinParamDescriptor {
374 name: "options",
375 ty: BuiltinParamType::Any,
376 arity: BuiltinParamArity::Variadic,
377 default: None,
378 description: "Reserved name/value options.",
379 },
380];
381
382const IN_BASE_REST: [BuiltinParamDescriptor; 2] = [
383 BuiltinParamDescriptor {
384 name: "obj",
385 ty: BuiltinParamType::Any,
386 arity: BuiltinParamArity::Required,
387 default: None,
388 description: "Receiver object.",
389 },
390 BuiltinParamDescriptor {
391 name: "options",
392 ty: BuiltinParamType::Any,
393 arity: BuiltinParamArity::Variadic,
394 default: None,
395 description: "Reserved name/value options.",
396 },
397];
398
399const IN_BASE_SLICE_OPTIONAL: [BuiltinParamDescriptor; 2] = [
400 BuiltinParamDescriptor {
401 name: "obj",
402 ty: BuiltinParamType::Any,
403 arity: BuiltinParamArity::Required,
404 default: None,
405 description: "DataArray receiver object.",
406 },
407 BuiltinParamDescriptor {
408 name: "sliceSpec",
409 ty: BuiltinParamType::Any,
410 arity: BuiltinParamArity::Optional,
411 default: None,
412 description: "Optional slice specification.",
413 },
414];
415
416const IN_BASE_VALUES: [BuiltinParamDescriptor; 2] = [
417 BuiltinParamDescriptor {
418 name: "obj",
419 ty: BuiltinParamType::Any,
420 arity: BuiltinParamArity::Required,
421 default: None,
422 description: "DataArray receiver object.",
423 },
424 BuiltinParamDescriptor {
425 name: "values",
426 ty: BuiltinParamType::Any,
427 arity: BuiltinParamArity::Required,
428 default: None,
429 description: "Full-array values payload.",
430 },
431];
432
433const IN_BASE_SLICE_VALUES: [BuiltinParamDescriptor; 3] = [
434 BuiltinParamDescriptor {
435 name: "obj",
436 ty: BuiltinParamType::Any,
437 arity: BuiltinParamArity::Required,
438 default: None,
439 description: "DataArray receiver object.",
440 },
441 BuiltinParamDescriptor {
442 name: "sliceSpec",
443 ty: BuiltinParamType::Any,
444 arity: BuiltinParamArity::Required,
445 default: None,
446 description: "Slice specification.",
447 },
448 BuiltinParamDescriptor {
449 name: "values",
450 ty: BuiltinParamType::Any,
451 arity: BuiltinParamArity::Required,
452 default: None,
453 description: "Slice values payload.",
454 },
455];
456
457const IN_BASE_NEW_SHAPE_REST: [BuiltinParamDescriptor; 3] = [
458 BuiltinParamDescriptor {
459 name: "obj",
460 ty: BuiltinParamType::Any,
461 arity: BuiltinParamArity::Required,
462 default: None,
463 description: "DataArray receiver object.",
464 },
465 BuiltinParamDescriptor {
466 name: "newShape",
467 ty: BuiltinParamType::Any,
468 arity: BuiltinParamArity::Required,
469 default: None,
470 description: "New array shape.",
471 },
472 BuiltinParamDescriptor {
473 name: "options",
474 ty: BuiltinParamType::Any,
475 arity: BuiltinParamArity::Variadic,
476 default: None,
477 description: "Reserved name/value options.",
478 },
479];
480
481const IN_BASE_VALUE_REST: [BuiltinParamDescriptor; 3] = [
482 BuiltinParamDescriptor {
483 name: "obj",
484 ty: BuiltinParamType::Any,
485 arity: BuiltinParamArity::Required,
486 default: None,
487 description: "DataArray receiver object.",
488 },
489 BuiltinParamDescriptor {
490 name: "value",
491 ty: BuiltinParamType::Any,
492 arity: BuiltinParamArity::Required,
493 default: None,
494 description: "Fill value.",
495 },
496 BuiltinParamDescriptor {
497 name: "options",
498 ty: BuiltinParamType::Any,
499 arity: BuiltinParamArity::Variadic,
500 default: None,
501 description: "Reserved name/value options.",
502 },
503];
504
505const IN_TX_WRITE: [BuiltinParamDescriptor; 5] = [
506 BuiltinParamDescriptor {
507 name: "tx",
508 ty: BuiltinParamType::Any,
509 arity: BuiltinParamArity::Required,
510 default: None,
511 description: "DataTransaction receiver.",
512 },
513 BuiltinParamDescriptor {
514 name: "arrayName",
515 ty: BuiltinParamType::StringScalar,
516 arity: BuiltinParamArity::Required,
517 default: None,
518 description: "Target array name.",
519 },
520 BuiltinParamDescriptor {
521 name: "sliceSpec",
522 ty: BuiltinParamType::Any,
523 arity: BuiltinParamArity::Required,
524 default: None,
525 description: "Slice specification.",
526 },
527 BuiltinParamDescriptor {
528 name: "values",
529 ty: BuiltinParamType::Any,
530 arity: BuiltinParamArity::Required,
531 default: None,
532 description: "Values payload.",
533 },
534 BuiltinParamDescriptor {
535 name: "options",
536 ty: BuiltinParamType::Any,
537 arity: BuiltinParamArity::Variadic,
538 default: None,
539 description: "Reserved name/value options.",
540 },
541];
542
543const IN_TX_ARRAY_SHAPE_REST: [BuiltinParamDescriptor; 4] = [
544 BuiltinParamDescriptor {
545 name: "tx",
546 ty: BuiltinParamType::Any,
547 arity: BuiltinParamArity::Required,
548 default: None,
549 description: "DataTransaction receiver.",
550 },
551 BuiltinParamDescriptor {
552 name: "arrayName",
553 ty: BuiltinParamType::StringScalar,
554 arity: BuiltinParamArity::Required,
555 default: None,
556 description: "Target array name.",
557 },
558 BuiltinParamDescriptor {
559 name: "newShape",
560 ty: BuiltinParamType::Any,
561 arity: BuiltinParamArity::Required,
562 default: None,
563 description: "New shape vector.",
564 },
565 BuiltinParamDescriptor {
566 name: "options",
567 ty: BuiltinParamType::Any,
568 arity: BuiltinParamArity::Variadic,
569 default: None,
570 description: "Reserved name/value options.",
571 },
572];
573
574const IN_TX_ARRAY_VALUE_SLICE_OPT: [BuiltinParamDescriptor; 4] = [
575 BuiltinParamDescriptor {
576 name: "tx",
577 ty: BuiltinParamType::Any,
578 arity: BuiltinParamArity::Required,
579 default: None,
580 description: "DataTransaction receiver.",
581 },
582 BuiltinParamDescriptor {
583 name: "arrayName",
584 ty: BuiltinParamType::StringScalar,
585 arity: BuiltinParamArity::Required,
586 default: None,
587 description: "Target array name.",
588 },
589 BuiltinParamDescriptor {
590 name: "value",
591 ty: BuiltinParamType::Any,
592 arity: BuiltinParamArity::Required,
593 default: None,
594 description: "Fill value.",
595 },
596 BuiltinParamDescriptor {
597 name: "sliceSpec",
598 ty: BuiltinParamType::Any,
599 arity: BuiltinParamArity::Optional,
600 default: None,
601 description: "Optional slice specification.",
602 },
603];
604
605const IN_TX_ARRAY_NAME: [BuiltinParamDescriptor; 2] = [
606 BuiltinParamDescriptor {
607 name: "tx",
608 ty: BuiltinParamType::Any,
609 arity: BuiltinParamArity::Required,
610 default: None,
611 description: "DataTransaction receiver.",
612 },
613 BuiltinParamDescriptor {
614 name: "arrayName",
615 ty: BuiltinParamType::StringScalar,
616 arity: BuiltinParamArity::Required,
617 default: None,
618 description: "Target array name.",
619 },
620];
621
622const IN_TX_ARRAY_META: [BuiltinParamDescriptor; 3] = [
623 BuiltinParamDescriptor {
624 name: "tx",
625 ty: BuiltinParamType::Any,
626 arity: BuiltinParamArity::Required,
627 default: None,
628 description: "DataTransaction receiver.",
629 },
630 BuiltinParamDescriptor {
631 name: "arrayName",
632 ty: BuiltinParamType::StringScalar,
633 arity: BuiltinParamArity::Required,
634 default: None,
635 description: "New array name.",
636 },
637 BuiltinParamDescriptor {
638 name: "meta",
639 ty: BuiltinParamType::Any,
640 arity: BuiltinParamArity::Required,
641 default: None,
642 description: "Array metadata struct.",
643 },
644];
645
646const IN_TX_COMMIT_REST: [BuiltinParamDescriptor; 2] = [
647 BuiltinParamDescriptor {
648 name: "tx",
649 ty: BuiltinParamType::Any,
650 arity: BuiltinParamArity::Required,
651 default: None,
652 description: "DataTransaction receiver.",
653 },
654 BuiltinParamDescriptor {
655 name: "options",
656 ty: BuiltinParamType::Any,
657 arity: BuiltinParamArity::Variadic,
658 default: None,
659 description: "Commit options (e.g. if_manifest).",
660 },
661];
662
663macro_rules! one_sig_descriptor {
664 ($desc:ident, $sigs:ident, $label:expr, $inputs:expr, $outputs:expr) => {
665 const $sigs: [BuiltinSignatureDescriptor; 1] = [BuiltinSignatureDescriptor {
666 label: $label,
667 inputs: $inputs,
668 outputs: $outputs,
669 }];
670 pub const $desc: BuiltinDescriptor = BuiltinDescriptor {
671 signatures: &$sigs,
672 output_mode: BuiltinOutputMode::Fixed,
673 completion_policy: BuiltinCompletionPolicy::Public,
674 errors: &DATA_DESCRIPTOR_ERRORS,
675 };
676 };
677}
678
679one_sig_descriptor!(
680 DATA_CREATE_DESCRIPTOR,
681 DATA_CREATE_SIGS,
682 "ds = data.create(path, schema, Name, Value, ...)",
683 &IN_PATH_SCHEMA_REST,
684 &OUT_DATASET
685);
686one_sig_descriptor!(
687 DATA_OPEN_DESCRIPTOR,
688 DATA_OPEN_SIGS,
689 "ds = data.open(path, Name, Value, ...)",
690 &IN_PATH_REST,
691 &OUT_DATASET
692);
693one_sig_descriptor!(
694 DATA_EXISTS_DESCRIPTOR,
695 DATA_EXISTS_SIGS,
696 "tf = data.exists(path)",
697 &IN_PATH_REST,
698 &OUT_BOOL
699);
700one_sig_descriptor!(
701 DATA_DELETE_DESCRIPTOR,
702 DATA_DELETE_SIGS,
703 "tf = data.delete(path, Name, Value, ...)",
704 &IN_PATH_REST,
705 &OUT_BOOL
706);
707one_sig_descriptor!(
708 DATA_COPY_DESCRIPTOR,
709 DATA_COPY_SIGS,
710 "tf = data.copy(fromPath, toPath, Name, Value, ...)",
711 &IN_FROM_TO_REST,
712 &OUT_BOOL
713);
714one_sig_descriptor!(
715 DATA_MOVE_DESCRIPTOR,
716 DATA_MOVE_SIGS,
717 "tf = data.move(fromPath, toPath, Name, Value, ...)",
718 &IN_FROM_TO_REST,
719 &OUT_BOOL
720);
721one_sig_descriptor!(
722 DATA_IMPORT_DESCRIPTOR,
723 DATA_IMPORT_SIGS,
724 "ds = data.import(path, format, sourcePath, Name, Value, ...)",
725 &IN_PATH_FORMAT_TARGET_REST,
726 &OUT_DATASET
727);
728one_sig_descriptor!(
729 DATA_EXPORT_DESCRIPTOR,
730 DATA_EXPORT_SIGS,
731 "tf = data.export(path, format, targetPath, Name, Value, ...)",
732 &IN_PATH_FORMAT_TARGET_REST,
733 &OUT_BOOL
734);
735one_sig_descriptor!(
736 DATA_LIST_DESCRIPTOR,
737 DATA_LIST_SIGS,
738 "C = data.list(prefix, Name, Value, ...)",
739 &IN_PREFIX_REST,
740 &OUT_CELL
741);
742one_sig_descriptor!(
743 DATA_INSPECT_DESCRIPTOR,
744 DATA_INSPECT_SIGS,
745 "S = data.inspect(path)",
746 &IN_PATH_REST,
747 &OUT_STRUCT
748);
749
750one_sig_descriptor!(
751 DATASET_PATH_DESCRIPTOR,
752 DATASET_PATH_SIGS,
753 "path = Dataset.path(ds)",
754 &IN_BASE,
755 &OUT_STRING
756);
757one_sig_descriptor!(
758 DATASET_ID_DESCRIPTOR,
759 DATASET_ID_SIGS,
760 "id = Dataset.id(ds)",
761 &IN_BASE,
762 &OUT_STRING
763);
764one_sig_descriptor!(
765 DATASET_VERSION_DESCRIPTOR,
766 DATASET_VERSION_SIGS,
767 "version = Dataset.version(ds)",
768 &IN_BASE,
769 &OUT_STRING
770);
771one_sig_descriptor!(
772 DATASET_ARRAYS_DESCRIPTOR,
773 DATASET_ARRAYS_SIGS,
774 "C = Dataset.arrays(ds)",
775 &IN_BASE,
776 &OUT_CELL
777);
778one_sig_descriptor!(
779 DATASET_HAS_ARRAY_DESCRIPTOR,
780 DATASET_HAS_ARRAY_SIGS,
781 "tf = Dataset.has_array(ds, name)",
782 &IN_BASE_NAME,
783 &OUT_BOOL
784);
785one_sig_descriptor!(
786 DATASET_ARRAY_DESCRIPTOR,
787 DATASET_ARRAY_SIGS,
788 "arr = Dataset.array(ds, name)",
789 &IN_BASE_NAME,
790 &OUT_ARRAY
791);
792one_sig_descriptor!(
793 DATASET_ATTRS_DESCRIPTOR,
794 DATASET_ATTRS_SIGS,
795 "S = Dataset.attrs(ds)",
796 &IN_BASE,
797 &OUT_STRUCT
798);
799one_sig_descriptor!(
800 DATASET_GET_ATTR_DESCRIPTOR,
801 DATASET_GET_ATTR_SIGS,
802 "value = Dataset.get_attr(ds, key, defaultValue)",
803 &IN_BASE_KEY_DEFAULT,
804 &OUT_VALUE
805);
806one_sig_descriptor!(
807 DATASET_SET_ATTR_DESCRIPTOR,
808 DATASET_SET_ATTR_SIGS,
809 "tf = Dataset.set_attr(ds, key, value)",
810 &IN_BASE_KEY_VALUE,
811 &OUT_BOOL
812);
813one_sig_descriptor!(
814 DATASET_SET_ATTRS_DESCRIPTOR,
815 DATASET_SET_ATTRS_SIGS,
816 "tf = Dataset.set_attrs(ds, attrs)",
817 &IN_BASE_ATTRS,
818 &OUT_BOOL
819);
820one_sig_descriptor!(
821 DATASET_BEGIN_DESCRIPTOR,
822 DATASET_BEGIN_SIGS,
823 "tx = Dataset.begin(ds, Name, Value, ...)",
824 &IN_BASE_REST,
825 &OUT_TX
826);
827one_sig_descriptor!(
828 DATASET_SNAPSHOT_DESCRIPTOR,
829 DATASET_SNAPSHOT_SIGS,
830 "snapshotPath = Dataset.snapshot(ds, label, Name, Value, ...)",
831 &IN_BASE_LABEL_REST,
832 &OUT_STRING
833);
834one_sig_descriptor!(
835 DATASET_REFRESH_DESCRIPTOR,
836 DATASET_REFRESH_SIGS,
837 "ds = Dataset.refresh(ds)",
838 &IN_BASE,
839 &OUT_DATASET
840);
841
842one_sig_descriptor!(
843 DATAARRAY_NAME_DESCRIPTOR,
844 DATAARRAY_NAME_SIGS,
845 "name = DataArray.name(arr)",
846 &IN_BASE,
847 &OUT_STRING
848);
849one_sig_descriptor!(
850 DATAARRAY_DTYPE_DESCRIPTOR,
851 DATAARRAY_DTYPE_SIGS,
852 "dtype = DataArray.dtype(arr)",
853 &IN_BASE,
854 &OUT_STRING
855);
856one_sig_descriptor!(
857 DATAARRAY_SHAPE_DESCRIPTOR,
858 DATAARRAY_SHAPE_SIGS,
859 "shape = DataArray.shape(arr)",
860 &IN_BASE,
861 &OUT_TENSOR
862);
863one_sig_descriptor!(
864 DATAARRAY_RANK_DESCRIPTOR,
865 DATAARRAY_RANK_SIGS,
866 "rank = DataArray.rank(arr)",
867 &IN_BASE,
868 &OUT_VALUE
869);
870one_sig_descriptor!(
871 DATAARRAY_CHUNK_SHAPE_DESCRIPTOR,
872 DATAARRAY_CHUNK_SHAPE_SIGS,
873 "chunkShape = DataArray.chunk_shape(arr)",
874 &IN_BASE,
875 &OUT_TENSOR
876);
877one_sig_descriptor!(
878 DATAARRAY_CODEC_DESCRIPTOR,
879 DATAARRAY_CODEC_SIGS,
880 "codec = DataArray.codec(arr)",
881 &IN_BASE,
882 &OUT_STRING
883);
884one_sig_descriptor!(
885 DATAARRAY_READ_DESCRIPTOR,
886 DATAARRAY_READ_SIGS,
887 "X = DataArray.read(arr, sliceSpec)",
888 &IN_BASE_SLICE_OPTIONAL,
889 &OUT_TENSOR
890);
891const DATAARRAY_WRITE_SIGS: [BuiltinSignatureDescriptor; 2] = [
892 BuiltinSignatureDescriptor {
893 label: "tf = DataArray.write(arr, values)",
894 inputs: &IN_BASE_VALUES,
895 outputs: &OUT_BOOL,
896 },
897 BuiltinSignatureDescriptor {
898 label: "tf = DataArray.write(arr, sliceSpec, values)",
899 inputs: &IN_BASE_SLICE_VALUES,
900 outputs: &OUT_BOOL,
901 },
902];
903pub const DATAARRAY_WRITE_DESCRIPTOR: BuiltinDescriptor = BuiltinDescriptor {
904 signatures: &DATAARRAY_WRITE_SIGS,
905 output_mode: BuiltinOutputMode::Fixed,
906 completion_policy: BuiltinCompletionPolicy::Public,
907 errors: &DATA_DESCRIPTOR_ERRORS,
908};
909one_sig_descriptor!(
910 DATAARRAY_RESIZE_DESCRIPTOR,
911 DATAARRAY_RESIZE_SIGS,
912 "tf = DataArray.resize(arr, newShape, Name, Value, ...)",
913 &IN_BASE_NEW_SHAPE_REST,
914 &OUT_BOOL
915);
916one_sig_descriptor!(
917 DATAARRAY_FILL_DESCRIPTOR,
918 DATAARRAY_FILL_SIGS,
919 "tf = DataArray.fill(arr, value, Name, Value, ...)",
920 &IN_BASE_VALUE_REST,
921 &OUT_BOOL
922);
923
924one_sig_descriptor!(
925 DATATX_ID_DESCRIPTOR,
926 DATATX_ID_SIGS,
927 "id = DataTransaction.id(tx)",
928 &IN_BASE,
929 &OUT_STRING
930);
931one_sig_descriptor!(
932 DATATX_WRITE_DESCRIPTOR,
933 DATATX_WRITE_SIGS_1,
934 "tf = DataTransaction.write(tx, arrayName, sliceSpec, values, Name, Value, ...)",
935 &IN_TX_WRITE,
936 &OUT_BOOL
937);
938one_sig_descriptor!(
939 DATATX_SET_ATTR_DESCRIPTOR,
940 DATATX_SET_ATTR_SIGS,
941 "tf = DataTransaction.set_attr(tx, key, value)",
942 &IN_BASE_KEY_VALUE,
943 &OUT_BOOL
944);
945one_sig_descriptor!(
946 DATATX_SET_ATTRS_DESCRIPTOR,
947 DATATX_SET_ATTRS_SIGS,
948 "tf = DataTransaction.set_attrs(tx, attrs)",
949 &IN_BASE_ATTRS,
950 &OUT_BOOL
951);
952one_sig_descriptor!(
953 DATATX_RESIZE_DESCRIPTOR,
954 DATATX_RESIZE_SIGS,
955 "tf = DataTransaction.resize(tx, arrayName, newShape, Name, Value, ...)",
956 &IN_TX_ARRAY_SHAPE_REST,
957 &OUT_BOOL
958);
959one_sig_descriptor!(
960 DATATX_FILL_DESCRIPTOR,
961 DATATX_FILL_SIGS,
962 "tf = DataTransaction.fill(tx, arrayName, value, sliceSpec)",
963 &IN_TX_ARRAY_VALUE_SLICE_OPT,
964 &OUT_BOOL
965);
966one_sig_descriptor!(
967 DATATX_DELETE_ARRAY_DESCRIPTOR,
968 DATATX_DELETE_ARRAY_SIGS,
969 "tf = DataTransaction.delete_array(tx, arrayName)",
970 &IN_TX_ARRAY_NAME,
971 &OUT_BOOL
972);
973one_sig_descriptor!(
974 DATATX_CREATE_ARRAY_DESCRIPTOR,
975 DATATX_CREATE_ARRAY_SIGS,
976 "tf = DataTransaction.create_array(tx, arrayName, meta)",
977 &IN_TX_ARRAY_META,
978 &OUT_BOOL
979);
980one_sig_descriptor!(
981 DATATX_COMMIT_DESCRIPTOR,
982 DATATX_COMMIT_SIGS,
983 "tf = DataTransaction.commit(tx, Name, Value, ...)",
984 &IN_TX_COMMIT_REST,
985 &OUT_BOOL
986);
987one_sig_descriptor!(
988 COMMIT_ALIAS_DESCRIPTOR,
989 COMMIT_ALIAS_SIGS,
990 "tf = commit(tx, Name, Value, ...)",
991 &IN_TX_COMMIT_REST,
992 &OUT_BOOL
993);
994one_sig_descriptor!(
995 DATATX_ABORT_DESCRIPTOR,
996 DATATX_ABORT_SIGS,
997 "tf = DataTransaction.abort(tx)",
998 &IN_BASE,
999 &OUT_BOOL
1000);
1001one_sig_descriptor!(
1002 DATATX_STATUS_DESCRIPTOR,
1003 DATATX_STATUS_SIGS,
1004 "status = DataTransaction.status(tx)",
1005 &IN_BASE,
1006 &OUT_STRING
1007);
1008
1009#[runtime_builtin(
1010 name = "data.create",
1011 category = "io/data",
1012 summary = "Create a typed dataset at a .data path.",
1013 keywords = "data,dataset,create,persistence",
1014 sink = true,
1015 type_resolver(crate::builtins::io::type_resolvers::data_dataset_type),
1016 descriptor(crate::builtins::io::data::DATA_CREATE_DESCRIPTOR),
1017 builtin_path = "crate::builtins::io::data"
1018)]
1019async fn data_create_builtin(
1020 path: Value,
1021 schema: Value,
1022 _rest: Vec<Value>,
1023) -> BuiltinResult<Value> {
1024 let path = parse_string(&path, "data.create path")?;
1025 let root = dataset_root(&path);
1026 let schema = parse_schema(&schema)?;
1027 let now = now_rfc3339();
1028 let mut arrays = BTreeMap::new();
1029 for (name, mut meta) in schema.arrays {
1030 let total = meta.shape.iter().copied().product::<usize>();
1031 let payload = DataArrayPayload {
1032 dtype: meta.dtype.clone(),
1033 shape: meta.shape.clone(),
1034 values: vec![0.0; total],
1035 };
1036 let (payload_path, chunk_index_path) =
1037 write_array_payload_async(&root, &name, &payload, &meta.chunk_shape).await?;
1038 meta.data_path = make_rel_data_path(&root, &payload_path)?;
1039 meta.chunk_index_path = Some(make_rel_data_path(&root, &chunk_index_path)?);
1040 arrays.insert(name, meta);
1041 }
1042
1043 let manifest = DataManifest {
1044 schema_version: 1,
1045 format: "runmat-data".to_string(),
1046 dataset_id: crate::data::new_dataset_id(),
1047 name: root.file_name().map(|v| v.to_string_lossy().to_string()),
1048 created_at: now.clone(),
1049 updated_at: now,
1050 arrays,
1051 attrs: BTreeMap::new(),
1052 txn_sequence: 0,
1053 };
1054 write_manifest_async(&root, &manifest).await?;
1055 Ok(dataset_object(&path, &manifest))
1056}
1057
1058#[runtime_builtin(
1059 name = "data.open",
1060 category = "io/data",
1061 summary = "Open a dataset handle from a .data path.",
1062 keywords = "data,dataset,open,persistence",
1063 type_resolver(crate::builtins::io::type_resolvers::data_dataset_type),
1064 descriptor(crate::builtins::io::data::DATA_OPEN_DESCRIPTOR),
1065 builtin_path = "crate::builtins::io::data"
1066)]
1067async fn data_open_builtin(path: Value, _rest: Vec<Value>) -> BuiltinResult<Value> {
1068 let path = parse_string(&path, "data.open path")?;
1069 let root = dataset_root(&path);
1070 let manifest = read_manifest_async(&root).await?;
1071 let mut ds = dataset_object(&path, &manifest);
1072 hydrate_dataset_descriptor_async(&path, &mut ds).await;
1073 Ok(ds)
1074}
1075
1076#[runtime_builtin(
1077 name = "data.exists",
1078 category = "io/data",
1079 summary = "Check if dataset exists.",
1080 keywords = "data,dataset,exists",
1081 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1082 descriptor(crate::builtins::io::data::DATA_EXISTS_DESCRIPTOR),
1083 builtin_path = "crate::builtins::io::data"
1084)]
1085async fn data_exists_builtin(path: Value) -> BuiltinResult<Value> {
1086 let path = parse_string(&path, "data.exists path")?;
1087 let root = dataset_root(&path);
1088 let exists = runmat_filesystem::metadata_async(manifest_path(&root))
1089 .await
1090 .is_ok();
1091 Ok(Value::Bool(exists))
1092}
1093
1094#[runtime_builtin(
1095 name = "data.delete",
1096 category = "io/data",
1097 summary = "Delete a dataset path.",
1098 keywords = "data,dataset,delete",
1099 sink = true,
1100 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1101 descriptor(crate::builtins::io::data::DATA_DELETE_DESCRIPTOR),
1102 builtin_path = "crate::builtins::io::data"
1103)]
1104async fn data_delete_builtin(path: Value, _rest: Vec<Value>) -> BuiltinResult<Value> {
1105 let path = parse_string(&path, "data.delete path")?;
1106 let root = dataset_root(&path);
1107 runmat_filesystem::remove_dir_all_async(&root)
1108 .await
1109 .map_err(|err| {
1110 data_error(format!(
1111 "data.delete: failed to remove '{}': {err}",
1112 root.display()
1113 ))
1114 })?;
1115 Ok(Value::Bool(true))
1116}
1117
1118#[runtime_builtin(
1119 name = "data.copy",
1120 category = "io/data",
1121 summary = "Copy dataset to new path.",
1122 keywords = "data,dataset,copy",
1123 sink = true,
1124 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1125 descriptor(crate::builtins::io::data::DATA_COPY_DESCRIPTOR),
1126 builtin_path = "crate::builtins::io::data"
1127)]
1128async fn data_copy_builtin(
1129 from_path: Value,
1130 to_path: Value,
1131 _rest: Vec<Value>,
1132) -> BuiltinResult<Value> {
1133 let from = parse_string(&from_path, "data.copy fromPath")?;
1134 let to = parse_string(&to_path, "data.copy toPath")?;
1135 copy_dir_recursive(&dataset_root(&from), &dataset_root(&to)).await?;
1136 Ok(Value::Bool(true))
1137}
1138
1139#[runtime_builtin(
1140 name = "data.move",
1141 category = "io/data",
1142 summary = "Move dataset to new path.",
1143 keywords = "data,dataset,move",
1144 sink = true,
1145 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1146 descriptor(crate::builtins::io::data::DATA_MOVE_DESCRIPTOR),
1147 builtin_path = "crate::builtins::io::data"
1148)]
1149async fn data_move_builtin(
1150 from_path: Value,
1151 to_path: Value,
1152 _rest: Vec<Value>,
1153) -> BuiltinResult<Value> {
1154 let from = parse_string(&from_path, "data.move fromPath")?;
1155 let to = parse_string(&to_path, "data.move toPath")?;
1156 runmat_filesystem::rename_async(dataset_root(&from), dataset_root(&to))
1157 .await
1158 .map_err(|err| {
1159 data_error(format!(
1160 "data.move: failed to move dataset '{from}' -> '{to}': {err}"
1161 ))
1162 })?;
1163 Ok(Value::Bool(true))
1164}
1165
1166#[runtime_builtin(
1167 name = "data.import",
1168 category = "io/data",
1169 summary = "Import an existing dataset file path.",
1170 keywords = "data,dataset,import",
1171 sink = true,
1172 type_resolver(crate::builtins::io::type_resolvers::data_dataset_type),
1173 descriptor(crate::builtins::io::data::DATA_IMPORT_DESCRIPTOR),
1174 builtin_path = "crate::builtins::io::data"
1175)]
1176async fn data_import_builtin(
1177 path: Value,
1178 format: Value,
1179 source_path: Value,
1180 _rest: Vec<Value>,
1181) -> BuiltinResult<Value> {
1182 let path = parse_string(&path, "data.import path")?;
1183 let format = parse_string(&format, "data.import format")?;
1184 if !format.eq_ignore_ascii_case("data") {
1185 return Err(data_error(
1186 "data.import currently supports only format='data'",
1187 ));
1188 }
1189 let source_path = parse_string(&source_path, "data.import sourcePath")?;
1190 copy_dir_recursive(&dataset_root(&source_path), &dataset_root(&path)).await?;
1191 let manifest = read_manifest_async(&dataset_root(&path)).await?;
1192 Ok(dataset_object(&path, &manifest))
1193}
1194
1195#[runtime_builtin(
1196 name = "data.export",
1197 category = "io/data",
1198 summary = "Export dataset to target path.",
1199 keywords = "data,dataset,export",
1200 sink = true,
1201 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1202 descriptor(crate::builtins::io::data::DATA_EXPORT_DESCRIPTOR),
1203 builtin_path = "crate::builtins::io::data"
1204)]
1205async fn data_export_builtin(
1206 path: Value,
1207 format: Value,
1208 target_path: Value,
1209 _rest: Vec<Value>,
1210) -> BuiltinResult<Value> {
1211 let path = parse_string(&path, "data.export path")?;
1212 let format = parse_string(&format, "data.export format")?;
1213 if !format.eq_ignore_ascii_case("data") {
1214 return Err(data_error(
1215 "data.export currently supports only format='data'",
1216 ));
1217 }
1218 let target_path = parse_string(&target_path, "data.export targetPath")?;
1219 copy_dir_recursive(&dataset_root(&path), &dataset_root(&target_path)).await?;
1220 Ok(Value::Bool(true))
1221}
1222
1223#[runtime_builtin(
1224 name = "data.list",
1225 category = "io/data",
1226 summary = "List dataset paths under a prefix.",
1227 keywords = "data,dataset,list",
1228 type_resolver(crate::builtins::io::type_resolvers::data_cell_string_type),
1229 descriptor(crate::builtins::io::data::DATA_LIST_DESCRIPTOR),
1230 builtin_path = "crate::builtins::io::data"
1231)]
1232async fn data_list_builtin(path_prefix: Value, _rest: Vec<Value>) -> BuiltinResult<Value> {
1233 let prefix = parse_string(&path_prefix, "data.list prefix")?;
1234 let root = PathBuf::from(prefix);
1235 let entries = runmat_filesystem::read_dir_async(&root)
1236 .await
1237 .map_err(|err| {
1238 data_error(format!(
1239 "data.list: failed to read '{}': {err}",
1240 root.display()
1241 ))
1242 })?;
1243 let mut values = Vec::new();
1244 for entry in entries {
1245 if !entry.is_dir() {
1246 continue;
1247 }
1248 let candidate = entry.path();
1249 if candidate.extension().and_then(|s| s.to_str()) != Some("data") {
1250 continue;
1251 }
1252 if runmat_filesystem::metadata_async(candidate.join("manifest.json"))
1253 .await
1254 .is_ok()
1255 {
1256 values.push(Value::String(candidate.to_string_lossy().to_string()));
1257 }
1258 }
1259 let cols = values.len();
1260 make_cell(values, 1, cols).map_err(data_error)
1261}
1262
1263#[runtime_builtin(
1264 name = "data.inspect",
1265 category = "io/data",
1266 summary = "Inspect dataset metadata and schema fields.",
1267 keywords = "data,dataset,inspect,schema",
1268 type_resolver(crate::builtins::io::type_resolvers::data_struct_type),
1269 descriptor(crate::builtins::io::data::DATA_INSPECT_DESCRIPTOR),
1270 builtin_path = "crate::builtins::io::data"
1271)]
1272async fn data_inspect_builtin(path: Value) -> BuiltinResult<Value> {
1273 let path = parse_string(&path, "data.inspect path")?;
1274 let root = dataset_root(&path);
1275 let manifest = read_manifest_async(&root).await?;
1276 let mut out = StructValue::new();
1277 out.fields.insert("path".to_string(), Value::String(path));
1278 out.fields
1279 .insert("id".to_string(), Value::String(manifest.dataset_id));
1280 out.fields.insert(
1281 "arrayCount".to_string(),
1282 Value::Num(manifest.arrays.len() as f64),
1283 );
1284 out.fields
1285 .insert("updatedAt".to_string(), Value::String(manifest.updated_at));
1286 Ok(Value::Struct(out))
1287}
1288
1289#[runtime_builtin(
1290 name = "Dataset.path",
1291 category = "io/data",
1292 summary = "Return dataset path.",
1293 keywords = "dataset,path",
1294 type_resolver(crate::builtins::io::type_resolvers::data_string_type),
1295 descriptor(crate::builtins::io::data::DATASET_PATH_DESCRIPTOR),
1296 builtin_path = "crate::builtins::io::data"
1297)]
1298async fn dataset_path_builtin(base: Value) -> BuiltinResult<Value> {
1299 let obj = as_object(&base, "Dataset.path")?;
1300 Ok(get_object_prop(obj, "__data_path")?.clone())
1301}
1302
1303#[runtime_builtin(
1304 name = "Dataset.id",
1305 category = "io/data",
1306 type_resolver(crate::builtins::io::type_resolvers::data_string_type),
1307 descriptor(crate::builtins::io::data::DATASET_ID_DESCRIPTOR),
1308 builtin_path = "crate::builtins::io::data"
1309)]
1310async fn dataset_id_builtin(base: Value) -> BuiltinResult<Value> {
1311 let obj = as_object(&base, "Dataset.id")?;
1312 Ok(get_object_prop(obj, "__data_id")?.clone())
1313}
1314
1315#[runtime_builtin(
1316 name = "Dataset.version",
1317 category = "io/data",
1318 type_resolver(crate::builtins::io::type_resolvers::data_string_type),
1319 descriptor(crate::builtins::io::data::DATASET_VERSION_DESCRIPTOR),
1320 builtin_path = "crate::builtins::io::data"
1321)]
1322async fn dataset_version_builtin(base: Value) -> BuiltinResult<Value> {
1323 let obj = as_object(&base, "Dataset.version")?;
1324 Ok(get_object_prop(obj, "__data_version")?.clone())
1325}
1326
1327#[runtime_builtin(
1328 name = "Dataset.arrays",
1329 category = "io/data",
1330 type_resolver(crate::builtins::io::type_resolvers::data_cell_string_type),
1331 descriptor(crate::builtins::io::data::DATASET_ARRAYS_DESCRIPTOR),
1332 builtin_path = "crate::builtins::io::data"
1333)]
1334async fn dataset_arrays_builtin(base: Value) -> BuiltinResult<Value> {
1335 let path = dataset_path_from_object(&base, "Dataset.arrays")?;
1336 let manifest = read_manifest_async(&dataset_root(&path)).await?;
1337 let values: Vec<Value> = manifest
1338 .arrays
1339 .keys()
1340 .map(|k| Value::String(k.clone()))
1341 .collect();
1342 make_cell(values.clone(), 1, values.len()).map_err(data_error)
1343}
1344
1345#[runtime_builtin(
1346 name = "Dataset.has_array",
1347 category = "io/data",
1348 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1349 descriptor(crate::builtins::io::data::DATASET_HAS_ARRAY_DESCRIPTOR),
1350 builtin_path = "crate::builtins::io::data"
1351)]
1352async fn dataset_has_array_builtin(base: Value, name: Value) -> BuiltinResult<Value> {
1353 let path = dataset_path_from_object(&base, "Dataset.has_array")?;
1354 let name = parse_string(&name, "Dataset.has_array name")?;
1355 let manifest = read_manifest_async(&dataset_root(&path)).await?;
1356 Ok(Value::Bool(manifest.arrays.contains_key(&name)))
1357}
1358
1359#[runtime_builtin(
1360 name = "Dataset.array",
1361 category = "io/data",
1362 type_resolver(crate::builtins::io::type_resolvers::data_array_type),
1363 descriptor(crate::builtins::io::data::DATASET_ARRAY_DESCRIPTOR),
1364 builtin_path = "crate::builtins::io::data"
1365)]
1366async fn dataset_array_builtin(base: Value, name: Value) -> BuiltinResult<Value> {
1367 let path = dataset_path_from_object(&base, "Dataset.array")?;
1368 let name = parse_string(&name, "Dataset.array name")?;
1369 let manifest = read_manifest_async(&dataset_root(&path)).await?;
1370 if !manifest.arrays.contains_key(&name) {
1371 return Err(data_error(format!(
1372 "Dataset.array: array '{name}' not found"
1373 )));
1374 }
1375 Ok(array_object(&path, &name))
1376}
1377
1378#[runtime_builtin(
1379 name = "Dataset.attrs",
1380 category = "io/data",
1381 type_resolver(crate::builtins::io::type_resolvers::data_struct_type),
1382 descriptor(crate::builtins::io::data::DATASET_ATTRS_DESCRIPTOR),
1383 builtin_path = "crate::builtins::io::data"
1384)]
1385async fn dataset_attrs_builtin(base: Value) -> BuiltinResult<Value> {
1386 let path = dataset_path_from_object(&base, "Dataset.attrs")?;
1387 let manifest = read_manifest_async(&dataset_root(&path)).await?;
1388 Ok(attrs_to_struct(&manifest.attrs))
1389}
1390
1391#[runtime_builtin(
1392 name = "Dataset.get_attr",
1393 category = "io/data",
1394 type_resolver(crate::builtins::io::type_resolvers::data_unknown_type),
1395 descriptor(crate::builtins::io::data::DATASET_GET_ATTR_DESCRIPTOR),
1396 builtin_path = "crate::builtins::io::data"
1397)]
1398async fn dataset_get_attr_builtin(
1399 base: Value,
1400 key: Value,
1401 rest: Vec<Value>,
1402) -> BuiltinResult<Value> {
1403 let path = dataset_path_from_object(&base, "Dataset.get_attr")?;
1404 let key = parse_string(&key, "Dataset.get_attr key")?;
1405 let manifest = read_manifest_async(&dataset_root(&path)).await?;
1406 if let Some(value) = manifest.attrs.get(&key) {
1407 return Ok(json_to_value(value));
1408 }
1409 Ok(rest.first().cloned().unwrap_or(Value::Num(0.0)))
1410}
1411
1412#[runtime_builtin(
1413 name = "Dataset.set_attr",
1414 category = "io/data",
1415 sink = true,
1416 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1417 descriptor(crate::builtins::io::data::DATASET_SET_ATTR_DESCRIPTOR),
1418 builtin_path = "crate::builtins::io::data"
1419)]
1420async fn dataset_set_attr_builtin(base: Value, key: Value, value: Value) -> BuiltinResult<Value> {
1421 let path = dataset_path_from_object(&base, "Dataset.set_attr")?;
1422 let key = parse_string(&key, "Dataset.set_attr key")?;
1423 let root = dataset_root(&path);
1424 let mut manifest = read_manifest_async(&root).await?;
1425 manifest.attrs.insert(key, value_to_json(&value));
1426 manifest.updated_at = now_rfc3339();
1427 manifest.txn_sequence = manifest.txn_sequence.saturating_add(1);
1428 write_manifest_async(&root, &manifest).await?;
1429 Ok(Value::Bool(true))
1430}
1431
1432#[runtime_builtin(
1433 name = "Dataset.set_attrs",
1434 category = "io/data",
1435 sink = true,
1436 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1437 descriptor(crate::builtins::io::data::DATASET_SET_ATTRS_DESCRIPTOR),
1438 builtin_path = "crate::builtins::io::data"
1439)]
1440async fn dataset_set_attrs_builtin(base: Value, attrs: Value) -> BuiltinResult<Value> {
1441 let path = dataset_path_from_object(&base, "Dataset.set_attrs")?;
1442 let Value::Struct(incoming) = attrs else {
1443 return Err(data_error("Dataset.set_attrs: attrs must be a struct"));
1444 };
1445 let root = dataset_root(&path);
1446 let mut manifest = read_manifest_async(&root).await?;
1447 for (k, v) in incoming.fields {
1448 manifest.attrs.insert(k, value_to_json(&v));
1449 }
1450 manifest.updated_at = now_rfc3339();
1451 manifest.txn_sequence = manifest.txn_sequence.saturating_add(1);
1452 write_manifest_async(&root, &manifest).await?;
1453 Ok(Value::Bool(true))
1454}
1455
1456#[runtime_builtin(
1457 name = "Dataset.begin",
1458 category = "io/data",
1459 type_resolver(crate::builtins::io::type_resolvers::data_tx_type),
1460 descriptor(crate::builtins::io::data::DATASET_BEGIN_DESCRIPTOR),
1461 builtin_path = "crate::builtins::io::data"
1462)]
1463async fn dataset_begin_builtin(base: Value, _rest: Vec<Value>) -> BuiltinResult<Value> {
1464 let path = dataset_path_from_object(&base, "Dataset.begin")?;
1465 let manifest = read_manifest_async(&dataset_root(&path)).await?;
1466 let tx_id = start_tx(path.clone(), manifest.txn_sequence);
1467 tracing::info!(
1468 target: "runmat.data",
1469 dataset = path,
1470 tx_id = tx_id,
1471 base_sequence = manifest.txn_sequence,
1472 "data transaction begin"
1473 );
1474 Ok(transaction_object(&path, &tx_id))
1475}
1476
1477#[runtime_builtin(
1478 name = "Dataset.snapshot",
1479 category = "io/data",
1480 type_resolver(crate::builtins::io::type_resolvers::data_string_type),
1481 descriptor(crate::builtins::io::data::DATASET_SNAPSHOT_DESCRIPTOR),
1482 builtin_path = "crate::builtins::io::data"
1483)]
1484async fn dataset_snapshot_builtin(
1485 base: Value,
1486 label: Value,
1487 _rest: Vec<Value>,
1488) -> BuiltinResult<Value> {
1489 let path = dataset_path_from_object(&base, "Dataset.snapshot")?;
1490 let label = parse_string(&label, "Dataset.snapshot label")?;
1491 let root = dataset_root(&path);
1492 let snapshots = root.join(".snapshots");
1493 runmat_filesystem::create_dir_all_async(&snapshots)
1494 .await
1495 .map_err(|err| {
1496 data_error(format!(
1497 "Dataset.snapshot: failed to create snapshots dir: {err}"
1498 ))
1499 })?;
1500 let src = manifest_path(&root);
1501 let dst = snapshots.join(format!("{}.manifest.json", sanitize_label(&label)));
1502 copy_file(&src, &dst).await?;
1503 Ok(Value::String(dst.to_string_lossy().to_string()))
1504}
1505
1506#[runtime_builtin(
1507 name = "Dataset.refresh",
1508 category = "io/data",
1509 type_resolver(crate::builtins::io::type_resolvers::data_dataset_type),
1510 descriptor(crate::builtins::io::data::DATASET_REFRESH_DESCRIPTOR),
1511 builtin_path = "crate::builtins::io::data"
1512)]
1513async fn dataset_refresh_builtin(base: Value) -> BuiltinResult<Value> {
1514 let path = dataset_path_from_object(&base, "Dataset.refresh")?;
1515 let manifest = read_manifest_async(&dataset_root(&path)).await?;
1516 let mut ds = dataset_object(&path, &manifest);
1517 hydrate_dataset_descriptor_async(&path, &mut ds).await;
1518 Ok(ds)
1519}
1520
1521#[runtime_builtin(
1522 name = "DataArray.name",
1523 category = "io/data",
1524 type_resolver(crate::builtins::io::type_resolvers::data_string_type),
1525 descriptor(crate::builtins::io::data::DATAARRAY_NAME_DESCRIPTOR),
1526 builtin_path = "crate::builtins::io::data"
1527)]
1528async fn data_array_name_builtin(base: Value) -> BuiltinResult<Value> {
1529 let obj = as_object(&base, "DataArray.name")?;
1530 Ok(get_object_prop(obj, "__array_name")?.clone())
1531}
1532
1533#[runtime_builtin(
1534 name = "DataArray.dtype",
1535 category = "io/data",
1536 type_resolver(crate::builtins::io::type_resolvers::data_string_type),
1537 descriptor(crate::builtins::io::data::DATAARRAY_DTYPE_DESCRIPTOR),
1538 builtin_path = "crate::builtins::io::data"
1539)]
1540async fn data_array_dtype_builtin(base: Value) -> BuiltinResult<Value> {
1541 let (path, name) = array_identity(&base, "DataArray.dtype")?;
1542 let manifest = read_manifest_async(&dataset_root(&path)).await?;
1543 let meta = manifest
1544 .arrays
1545 .get(&name)
1546 .ok_or_else(|| data_error(format!("DataArray.dtype: array '{name}' not found")))?;
1547 Ok(Value::String(meta.dtype.clone()))
1548}
1549
1550#[runtime_builtin(
1551 name = "DataArray.shape",
1552 category = "io/data",
1553 type_resolver(crate::builtins::io::type_resolvers::data_shape_tensor_type),
1554 descriptor(crate::builtins::io::data::DATAARRAY_SHAPE_DESCRIPTOR),
1555 builtin_path = "crate::builtins::io::data"
1556)]
1557async fn data_array_shape_builtin(base: Value) -> BuiltinResult<Value> {
1558 let (path, name) = array_identity(&base, "DataArray.shape")?;
1559 let manifest = read_manifest_async(&dataset_root(&path)).await?;
1560 let meta = manifest
1561 .arrays
1562 .get(&name)
1563 .ok_or_else(|| data_error(format!("DataArray.shape: array '{name}' not found")))?;
1564 let values = meta.shape.iter().map(|v| *v as f64).collect::<Vec<_>>();
1565 let tensor = Tensor::new(values, vec![1, meta.shape.len()])
1566 .map_err(|err| data_error(format!("DataArray.shape: {err}")))?;
1567 Ok(Value::Tensor(tensor))
1568}
1569
1570#[runtime_builtin(
1571 name = "DataArray.rank",
1572 category = "io/data",
1573 type_resolver(crate::builtins::io::type_resolvers::data_int_type),
1574 descriptor(crate::builtins::io::data::DATAARRAY_RANK_DESCRIPTOR),
1575 builtin_path = "crate::builtins::io::data"
1576)]
1577async fn data_array_rank_builtin(base: Value) -> BuiltinResult<Value> {
1578 let (path, name) = array_identity(&base, "DataArray.rank")?;
1579 let manifest = read_manifest_async(&dataset_root(&path)).await?;
1580 let meta = manifest
1581 .arrays
1582 .get(&name)
1583 .ok_or_else(|| data_error(format!("DataArray.rank: array '{name}' not found")))?;
1584 Ok(Value::Num(meta.shape.len() as f64))
1585}
1586
1587#[runtime_builtin(
1588 name = "DataArray.chunk_shape",
1589 category = "io/data",
1590 type_resolver(crate::builtins::io::type_resolvers::data_shape_tensor_type),
1591 descriptor(crate::builtins::io::data::DATAARRAY_CHUNK_SHAPE_DESCRIPTOR),
1592 builtin_path = "crate::builtins::io::data"
1593)]
1594async fn data_array_chunk_shape_builtin(base: Value) -> BuiltinResult<Value> {
1595 let (path, name) = array_identity(&base, "DataArray.chunk_shape")?;
1596 let manifest = read_manifest_async(&dataset_root(&path)).await?;
1597 let meta = manifest
1598 .arrays
1599 .get(&name)
1600 .ok_or_else(|| data_error(format!("DataArray.chunk_shape: array '{name}' not found")))?;
1601 let values = meta
1602 .chunk_shape
1603 .iter()
1604 .map(|v| *v as f64)
1605 .collect::<Vec<_>>();
1606 let tensor = Tensor::new(values, vec![1, meta.chunk_shape.len()])
1607 .map_err(|err| data_error(format!("DataArray.chunk_shape: {err}")))?;
1608 Ok(Value::Tensor(tensor))
1609}
1610
1611#[runtime_builtin(
1612 name = "DataArray.codec",
1613 category = "io/data",
1614 type_resolver(crate::builtins::io::type_resolvers::data_string_type),
1615 descriptor(crate::builtins::io::data::DATAARRAY_CODEC_DESCRIPTOR),
1616 builtin_path = "crate::builtins::io::data"
1617)]
1618async fn data_array_codec_builtin(base: Value) -> BuiltinResult<Value> {
1619 let (path, name) = array_identity(&base, "DataArray.codec")?;
1620 let manifest = read_manifest_async(&dataset_root(&path)).await?;
1621 let meta = manifest
1622 .arrays
1623 .get(&name)
1624 .ok_or_else(|| data_error(format!("DataArray.codec: array '{name}' not found")))?;
1625 Ok(Value::String(meta.codec.clone()))
1626}
1627
1628#[runtime_builtin(
1629 name = "DataArray.read",
1630 category = "io/data",
1631 type_resolver(crate::builtins::io::type_resolvers::data_tensor_type),
1632 descriptor(crate::builtins::io::data::DATAARRAY_READ_DESCRIPTOR),
1633 builtin_path = "crate::builtins::io::data"
1634)]
1635async fn data_array_read_builtin(base: Value, rest: Vec<Value>) -> BuiltinResult<Value> {
1636 let (path, name) = array_identity(&base, "DataArray.read")?;
1637 let root = dataset_root(&path);
1638 let manifest = read_manifest_async(&root).await?;
1639 let meta = manifest
1640 .arrays
1641 .get(&name)
1642 .ok_or_else(|| data_error(format!("DataArray.read: array '{name}' not found")))?;
1643 let payload = read_array_payload_async(&root, meta).await?;
1644 let sliced = if let Some(slice_spec) = rest.first() {
1645 read_slice_payload(&payload, slice_spec)?
1646 } else {
1647 payload
1648 };
1649 let tensor = Tensor::new(sliced.values, sliced.shape)
1650 .map_err(|err| data_error(format!("DataArray.read: {err}")))?;
1651 Ok(Value::Tensor(tensor))
1652}
1653
1654#[runtime_builtin(
1655 name = "DataArray.write",
1656 category = "io/data",
1657 sink = true,
1658 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1659 descriptor(crate::builtins::io::data::DATAARRAY_WRITE_DESCRIPTOR),
1660 builtin_path = "crate::builtins::io::data"
1661)]
1662async fn data_array_write_builtin(base: Value, rest: Vec<Value>) -> BuiltinResult<Value> {
1663 let (path, name) = array_identity(&base, "DataArray.write")?;
1664 let (slice_spec, value) = match rest.as_slice() {
1665 [v] => (None, v),
1666 [slice, v] => (Some(slice), v),
1667 _ => {
1668 return Err(data_error(
1669 "DataArray.write expects values or (sliceSpec, values) arguments",
1670 ))
1671 }
1672 };
1673 write_array_full_async(&path, &name, slice_spec, value).await?;
1674 Ok(Value::Bool(true))
1675}
1676
1677#[runtime_builtin(
1678 name = "DataArray.resize",
1679 category = "io/data",
1680 sink = true,
1681 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1682 descriptor(crate::builtins::io::data::DATAARRAY_RESIZE_DESCRIPTOR),
1683 builtin_path = "crate::builtins::io::data"
1684)]
1685async fn data_array_resize_builtin(
1686 base: Value,
1687 new_shape: Value,
1688 _rest: Vec<Value>,
1689) -> BuiltinResult<Value> {
1690 let (path, name) = array_identity(&base, "DataArray.resize")?;
1691 let shape = parse_shape_from_value(&new_shape)?;
1692 let root = dataset_root(&path);
1693 let mut manifest = read_manifest_async(&root).await?;
1694 let meta = manifest
1695 .arrays
1696 .get_mut(&name)
1697 .ok_or_else(|| data_error(format!("DataArray.resize: array '{name}' not found")))?;
1698 meta.shape = shape.clone();
1699 let payload = DataArrayPayload {
1700 dtype: meta.dtype.clone(),
1701 shape: shape.clone(),
1702 values: vec![0.0; shape.iter().copied().product()],
1703 };
1704 let (payload_path, chunk_index_path) =
1705 write_array_payload_async(&root, &name, &payload, &meta.chunk_shape).await?;
1706 meta.data_path = make_rel_data_path(&root, &payload_path)?;
1707 meta.chunk_index_path = Some(make_rel_data_path(&root, &chunk_index_path)?);
1708 manifest.updated_at = now_rfc3339();
1709 manifest.txn_sequence = manifest.txn_sequence.saturating_add(1);
1710 write_manifest_async(&root, &manifest).await?;
1711 Ok(Value::Bool(true))
1712}
1713
1714#[runtime_builtin(
1715 name = "DataArray.fill",
1716 category = "io/data",
1717 sink = true,
1718 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1719 descriptor(crate::builtins::io::data::DATAARRAY_FILL_DESCRIPTOR),
1720 builtin_path = "crate::builtins::io::data"
1721)]
1722async fn data_array_fill_builtin(
1723 base: Value,
1724 value: Value,
1725 _rest: Vec<Value>,
1726) -> BuiltinResult<Value> {
1727 let (path, name) = array_identity(&base, "DataArray.fill")?;
1728 let root = dataset_root(&path);
1729 let mut manifest = read_manifest_async(&root).await?;
1730 let meta = manifest
1731 .arrays
1732 .get_mut(&name)
1733 .ok_or_else(|| data_error(format!("DataArray.fill: array '{name}' not found")))?;
1734 let scalar = scalar_to_f64(&value)?;
1735 let payload = DataArrayPayload {
1736 dtype: meta.dtype.clone(),
1737 shape: meta.shape.clone(),
1738 values: vec![scalar; meta.shape.iter().copied().product()],
1739 };
1740 let (payload_path, chunk_index_path) =
1741 write_array_payload_async(&root, &name, &payload, &meta.chunk_shape).await?;
1742 meta.data_path = make_rel_data_path(&root, &payload_path)?;
1743 meta.chunk_index_path = Some(make_rel_data_path(&root, &chunk_index_path)?);
1744 manifest.updated_at = now_rfc3339();
1745 manifest.txn_sequence = manifest.txn_sequence.saturating_add(1);
1746 write_manifest_async(&root, &manifest).await?;
1747 Ok(Value::Bool(true))
1748}
1749
1750#[runtime_builtin(
1751 name = "DataTransaction.id",
1752 category = "io/data",
1753 type_resolver(crate::builtins::io::type_resolvers::data_string_type),
1754 descriptor(crate::builtins::io::data::DATATX_ID_DESCRIPTOR),
1755 builtin_path = "crate::builtins::io::data"
1756)]
1757async fn data_tx_id_builtin(base: Value) -> BuiltinResult<Value> {
1758 let obj = as_object(&base, "DataTransaction.id")?;
1759 Ok(get_object_prop(obj, "__tx_id")?.clone())
1760}
1761
1762#[runtime_builtin(
1763 name = "DataTransaction.write",
1764 category = "io/data",
1765 sink = true,
1766 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1767 descriptor(crate::builtins::io::data::DATATX_WRITE_DESCRIPTOR),
1768 builtin_path = "crate::builtins::io::data"
1769)]
1770async fn data_tx_write_builtin(
1771 base: Value,
1772 array_name: Value,
1773 slice: Value,
1774 values: Value,
1775 _rest: Vec<Value>,
1776) -> BuiltinResult<Value> {
1777 let tx_id = tx_id_from_object(&base, "DataTransaction.write")?;
1778 let array_name = parse_string(&array_name, "DataTransaction.write arrayName")?;
1779 with_tx_mut(&tx_id, |tx| {
1780 if tx.status != TxnStatus::Open {
1781 return Err(data_error("DataTransaction.write: transaction is not open"));
1782 }
1783 tx.writes.push(PendingWrite {
1784 array: array_name,
1785 slice_spec: Some(slice),
1786 value: values,
1787 });
1788 Ok(())
1789 })?;
1790 Ok(Value::Bool(true))
1791}
1792
1793#[runtime_builtin(
1794 name = "DataTransaction.set_attr",
1795 category = "io/data",
1796 sink = true,
1797 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1798 descriptor(crate::builtins::io::data::DATATX_SET_ATTR_DESCRIPTOR),
1799 builtin_path = "crate::builtins::io::data"
1800)]
1801async fn data_tx_set_attr_builtin(base: Value, key: Value, value: Value) -> BuiltinResult<Value> {
1802 let tx_id = tx_id_from_object(&base, "DataTransaction.set_attr")?;
1803 let key = parse_string(&key, "DataTransaction.set_attr key")?;
1804 with_tx_mut(&tx_id, |tx| {
1805 if tx.status != TxnStatus::Open {
1806 return Err(data_error(
1807 "DataTransaction.set_attr: transaction is not open",
1808 ));
1809 }
1810 tx.attrs.insert(key, value);
1811 Ok(())
1812 })?;
1813 Ok(Value::Bool(true))
1814}
1815
1816#[runtime_builtin(
1817 name = "DataTransaction.set_attrs",
1818 category = "io/data",
1819 sink = true,
1820 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1821 descriptor(crate::builtins::io::data::DATATX_SET_ATTRS_DESCRIPTOR),
1822 builtin_path = "crate::builtins::io::data"
1823)]
1824async fn data_tx_set_attrs_builtin(base: Value, attrs: Value) -> BuiltinResult<Value> {
1825 let tx_id = tx_id_from_object(&base, "DataTransaction.set_attrs")?;
1826 let Value::Struct(incoming) = attrs else {
1827 return Err(data_error(
1828 "DataTransaction.set_attrs: attrs must be struct",
1829 ));
1830 };
1831 with_tx_mut(&tx_id, |tx| {
1832 if tx.status != TxnStatus::Open {
1833 return Err(data_error(
1834 "DataTransaction.set_attrs: transaction is not open",
1835 ));
1836 }
1837 for (k, v) in incoming.fields {
1838 tx.attrs.insert(k, v);
1839 }
1840 Ok(())
1841 })?;
1842 Ok(Value::Bool(true))
1843}
1844
1845#[runtime_builtin(
1846 name = "DataTransaction.resize",
1847 category = "io/data",
1848 sink = true,
1849 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1850 descriptor(crate::builtins::io::data::DATATX_RESIZE_DESCRIPTOR),
1851 builtin_path = "crate::builtins::io::data"
1852)]
1853async fn data_tx_resize_builtin(
1854 base: Value,
1855 array_name: Value,
1856 new_shape: Value,
1857 _rest: Vec<Value>,
1858) -> BuiltinResult<Value> {
1859 let tx_id = tx_id_from_object(&base, "DataTransaction.resize")?;
1860 let array_name = parse_string(&array_name, "DataTransaction.resize arrayName")?;
1861 let shape = parse_shape_from_value(&new_shape)?;
1862 with_tx_mut(&tx_id, |tx| {
1863 if tx.status != TxnStatus::Open {
1864 return Err(data_error(
1865 "DataTransaction.resize: transaction is not open",
1866 ));
1867 }
1868 tx.resizes.push(PendingResize {
1869 array: array_name,
1870 shape,
1871 });
1872 Ok(())
1873 })?;
1874 Ok(Value::Bool(true))
1875}
1876
1877#[runtime_builtin(
1878 name = "DataTransaction.fill",
1879 category = "io/data",
1880 sink = true,
1881 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1882 descriptor(crate::builtins::io::data::DATATX_FILL_DESCRIPTOR),
1883 builtin_path = "crate::builtins::io::data"
1884)]
1885async fn data_tx_fill_builtin(
1886 base: Value,
1887 array_name: Value,
1888 value: Value,
1889 rest: Vec<Value>,
1890) -> BuiltinResult<Value> {
1891 let tx_id = tx_id_from_object(&base, "DataTransaction.fill")?;
1892 let array_name = parse_string(&array_name, "DataTransaction.fill arrayName")?;
1893 let slice_spec = rest.first().cloned();
1894 with_tx_mut(&tx_id, |tx| {
1895 if tx.status != TxnStatus::Open {
1896 return Err(data_error("DataTransaction.fill: transaction is not open"));
1897 }
1898 tx.fills.push(PendingFill {
1899 array: array_name,
1900 slice_spec,
1901 value,
1902 });
1903 Ok(())
1904 })?;
1905 Ok(Value::Bool(true))
1906}
1907
1908#[runtime_builtin(
1909 name = "DataTransaction.delete_array",
1910 category = "io/data",
1911 sink = true,
1912 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1913 descriptor(crate::builtins::io::data::DATATX_DELETE_ARRAY_DESCRIPTOR),
1914 builtin_path = "crate::builtins::io::data"
1915)]
1916async fn data_tx_delete_array_builtin(base: Value, array_name: Value) -> BuiltinResult<Value> {
1917 let tx_id = tx_id_from_object(&base, "DataTransaction.delete_array")?;
1918 let array_name = parse_string(&array_name, "DataTransaction.delete_array arrayName")?;
1919 with_tx_mut(&tx_id, |tx| {
1920 if tx.status != TxnStatus::Open {
1921 return Err(data_error(
1922 "DataTransaction.delete_array: transaction is not open",
1923 ));
1924 }
1925 tx.delete_arrays.push(array_name);
1926 Ok(())
1927 })?;
1928 Ok(Value::Bool(true))
1929}
1930
1931#[runtime_builtin(
1932 name = "DataTransaction.create_array",
1933 category = "io/data",
1934 sink = true,
1935 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1936 descriptor(crate::builtins::io::data::DATATX_CREATE_ARRAY_DESCRIPTOR),
1937 builtin_path = "crate::builtins::io::data"
1938)]
1939async fn data_tx_create_array_builtin(
1940 base: Value,
1941 array_name: Value,
1942 meta: Value,
1943) -> BuiltinResult<Value> {
1944 let tx_id = tx_id_from_object(&base, "DataTransaction.create_array")?;
1945 let array_name = parse_string(&array_name, "DataTransaction.create_array arrayName")?;
1946 let meta = parse_array_meta(&array_name, &meta)?;
1947 with_tx_mut(&tx_id, |tx| {
1948 if tx.status != TxnStatus::Open {
1949 return Err(data_error(
1950 "DataTransaction.create_array: transaction is not open",
1951 ));
1952 }
1953 tx.create_arrays.push(PendingCreateArray {
1954 array: array_name,
1955 meta,
1956 });
1957 Ok(())
1958 })?;
1959 Ok(Value::Bool(true))
1960}
1961
1962#[runtime_builtin(
1963 name = "DataTransaction.commit",
1964 category = "io/data",
1965 sink = true,
1966 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1967 descriptor(crate::builtins::io::data::DATATX_COMMIT_DESCRIPTOR),
1968 builtin_path = "crate::builtins::io::data"
1969)]
1970async fn data_tx_commit_builtin(base: Value, rest: Vec<Value>) -> BuiltinResult<Value> {
1971 let tx_id = tx_id_from_object(&base, "DataTransaction.commit")?;
1972 let (dataset_path, base_sequence, writes, resizes, fills, create_arrays, delete_arrays, attrs) =
1973 with_tx(&tx_id, |tx| {
1974 if tx.status != TxnStatus::Open {
1975 return Err(data_error(
1976 "DataTransaction.commit: transaction is not open",
1977 ));
1978 }
1979 Ok((
1980 tx.dataset_path.clone(),
1981 tx.base_sequence,
1982 tx.writes.clone(),
1983 tx.resizes.clone(),
1984 tx.fills.clone(),
1985 tx.create_arrays.clone(),
1986 tx.delete_arrays.clone(),
1987 tx.attrs.clone(),
1988 ))
1989 })?;
1990
1991 let write_ops = writes.len();
1992 let resize_ops = resizes.len();
1993 let fill_ops = fills.len();
1994 let create_ops = create_arrays.len();
1995 let delete_ops = delete_arrays.len();
1996 let attr_updates = attrs.len();
1997
1998 let root = dataset_root(&dataset_path);
1999 let mut manifest = read_manifest_async(&root).await?;
2000 ensure_manifest_sequence(base_sequence, &manifest)?;
2001 if let Some(Value::Struct(options)) = rest.first() {
2002 if let Some(expected) = options.fields.get("if_manifest") {
2003 let expected = parse_string(expected, "DataTransaction.commit if_manifest")?;
2004 let actual = manifest_version_token(&manifest);
2005 if expected != actual {
2006 tracing::warn!(
2007 target: "runmat.data",
2008 tx_id = tx_id,
2009 expected_manifest = expected,
2010 actual_manifest = actual,
2011 "data transaction manifest conflict"
2012 );
2013 return Err(data_error(
2014 "MANIFEST_CONFLICT: if_manifest precondition failed",
2015 ));
2016 }
2017 }
2018 }
2019 for create in create_arrays {
2020 create_array_in_manifest(&root, &mut manifest, &create.array, create.meta).await?;
2021 }
2022 for resize in resizes {
2023 resize_array_in_manifest(&root, &mut manifest, &resize.array, resize.shape).await?;
2024 }
2025 for fill in fills {
2026 fill_array_in_manifest(
2027 &root,
2028 &mut manifest,
2029 &fill.array,
2030 fill.slice_spec.as_ref(),
2031 &fill.value,
2032 )
2033 .await?;
2034 }
2035 for write in writes {
2036 apply_write_to_manifest_async(
2037 &root,
2038 &mut manifest,
2039 &write.array,
2040 write.slice_spec.as_ref(),
2041 &write.value,
2042 )
2043 .await?;
2044 }
2045 for array_name in delete_arrays {
2046 delete_array_in_manifest_async(&root, &mut manifest, &array_name).await?;
2047 }
2048 for (k, v) in attrs {
2049 manifest.attrs.insert(k, value_to_json(&v));
2050 }
2051 manifest.updated_at = now_rfc3339();
2052 manifest.txn_sequence = manifest.txn_sequence.saturating_add(1);
2053 write_manifest_async(&root, &manifest).await?;
2054 with_tx_mut(&tx_id, |tx| {
2055 tx.status = TxnStatus::Committed;
2056 Ok(())
2057 })?;
2058 tracing::info!(
2059 target: "runmat.data",
2060 dataset = dataset_path,
2061 tx_id = tx_id,
2062 write_ops = write_ops,
2063 resize_ops = resize_ops,
2064 fill_ops = fill_ops,
2065 create_ops = create_ops,
2066 delete_ops = delete_ops,
2067 attr_updates = attr_updates,
2068 next_sequence = manifest.txn_sequence,
2069 "data transaction commit"
2070 );
2071 remove_tx(&tx_id);
2072 Ok(Value::Bool(true))
2073}
2074
2075#[runtime_builtin(
2076 name = "commit",
2077 category = "io/data",
2078 sink = true,
2079 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
2080 descriptor(crate::builtins::io::data::COMMIT_ALIAS_DESCRIPTOR),
2081 builtin_path = "crate::builtins::io::data"
2082)]
2083async fn data_tx_commit_alias_builtin(base: Value, rest: Vec<Value>) -> BuiltinResult<Value> {
2084 match &base {
2085 Value::Object(obj) if obj.class_name == "DataTransaction" => {
2086 data_tx_commit_builtin(base, rest).await
2087 }
2088 Value::HandleObject(handle) if handle.class_name == "DataTransaction" => {
2089 data_tx_commit_builtin(base, rest).await
2090 }
2091 _ => Err(data_error(
2092 "commit: receiver must be a DataTransaction (use tx = ds.begin())",
2093 )),
2094 }
2095}
2096
2097#[runtime_builtin(
2098 name = "DataTransaction.abort",
2099 category = "io/data",
2100 sink = true,
2101 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
2102 descriptor(crate::builtins::io::data::DATATX_ABORT_DESCRIPTOR),
2103 builtin_path = "crate::builtins::io::data"
2104)]
2105async fn data_tx_abort_builtin(base: Value) -> BuiltinResult<Value> {
2106 let tx_id = tx_id_from_object(&base, "DataTransaction.abort")?;
2107 with_tx_mut(&tx_id, |tx| {
2108 tx.status = TxnStatus::Aborted;
2109 Ok(())
2110 })?;
2111 tracing::info!(
2112 target: "runmat.data",
2113 tx_id = tx_id,
2114 "data transaction abort"
2115 );
2116 remove_tx(&tx_id);
2117 Ok(Value::Bool(true))
2118}
2119
2120#[runtime_builtin(
2121 name = "DataTransaction.status",
2122 category = "io/data",
2123 type_resolver(crate::builtins::io::type_resolvers::data_string_type),
2124 descriptor(crate::builtins::io::data::DATATX_STATUS_DESCRIPTOR),
2125 builtin_path = "crate::builtins::io::data"
2126)]
2127async fn data_tx_status_builtin(base: Value) -> BuiltinResult<Value> {
2128 let tx_id = tx_id_from_object(&base, "DataTransaction.status")?;
2129 with_tx(&tx_id, |tx| {
2130 let status = match tx.status {
2131 TxnStatus::Open => "open",
2132 TxnStatus::Committed => "committed",
2133 TxnStatus::Aborted => "aborted",
2134 };
2135 Ok(Value::String(status.to_string()))
2136 })
2137}
2138
2139fn dataset_path_from_object(base: &Value, context: &str) -> BuiltinResult<String> {
2140 let obj = as_object(base, context)?;
2141 parse_string(get_object_prop(obj, "__data_path")?, context)
2142}
2143
2144fn tx_id_from_object(base: &Value, context: &str) -> BuiltinResult<String> {
2145 let obj = as_object(base, context)?;
2146 parse_string(get_object_prop(obj, "__tx_id")?, context)
2147}
2148
2149fn array_identity(base: &Value, context: &str) -> BuiltinResult<(String, String)> {
2150 let obj = as_object(base, context)?;
2151 let path = parse_string(get_object_prop(obj, "__data_path")?, context)?;
2152 let name = parse_string(get_object_prop(obj, "__array_name")?, context)?;
2153 Ok((path, name))
2154}
2155
2156fn as_object<'a>(value: &'a Value, context: &str) -> BuiltinResult<&'a ObjectInstance> {
2157 match value {
2158 Value::Object(obj) => Ok(obj),
2159 _ => Err(data_error(format!("{context}: expected object receiver"))),
2160 }
2161}
2162
2163async fn hydrate_dataset_descriptor_async(path: &str, dataset: &mut Value) {
2164 let request = runmat_filesystem::data_contract::DataManifestRequest {
2165 path: path.to_string(),
2166 version: None,
2167 };
2168 let descriptor = match runmat_filesystem::data_manifest_descriptor_async(&request).await {
2169 Ok(descriptor) => descriptor,
2170 Err(_) => return,
2171 };
2172 let Value::Object(obj) = dataset else {
2173 return;
2174 };
2175 if !descriptor.dataset_id.is_empty() {
2176 obj.properties.insert(
2177 "__data_id".to_string(),
2178 Value::String(descriptor.dataset_id),
2179 );
2180 }
2181 obj.properties.insert(
2182 "__data_version".to_string(),
2183 Value::String(format!(
2184 "{}:{}",
2185 descriptor.updated_at, descriptor.txn_sequence
2186 )),
2187 );
2188}
2189
2190fn sanitize_label(label: &str) -> String {
2191 label
2192 .chars()
2193 .map(|ch| {
2194 if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
2195 ch
2196 } else {
2197 '_'
2198 }
2199 })
2200 .collect()
2201}
2202
2203async fn copy_file(src: &PathBuf, dst: &PathBuf) -> BuiltinResult<()> {
2204 let bytes = runmat_filesystem::read_async(src)
2205 .await
2206 .map_err(|err| data_error(format!("failed to open '{}': {err}", src.display())))?;
2207 let parent = dst.parent().ok_or_else(|| {
2208 data_error(format!(
2209 "invalid destination path '{}': missing parent",
2210 dst.display()
2211 ))
2212 })?;
2213 runmat_filesystem::create_dir_all_async(parent)
2214 .await
2215 .map_err(|err| data_error(format!("failed to create '{}': {err}", parent.display())))?;
2216 runmat_filesystem::write_async(dst, &bytes)
2217 .await
2218 .map_err(|err| {
2219 data_error(format!(
2220 "failed to copy '{}' -> '{}': {err}",
2221 src.display(),
2222 dst.display()
2223 ))
2224 })?;
2225 Ok(())
2226}
2227
2228fn make_rel_data_path(
2229 root: &std::path::Path,
2230 payload_path: &std::path::Path,
2231) -> BuiltinResult<String> {
2232 let rel = payload_path
2233 .strip_prefix(root)
2234 .map_err(|err| data_error(format!("failed to compute relative data path: {err}")))?;
2235 Ok(rel.to_string_lossy().to_string())
2236}
2237
2238async fn create_array_in_manifest(
2239 root: &std::path::Path,
2240 manifest: &mut DataManifest,
2241 array_name: &str,
2242 mut meta: DataArrayMeta,
2243) -> BuiltinResult<()> {
2244 if manifest.arrays.contains_key(array_name) {
2245 return Err(data_error(format!(
2246 "DataTransaction.create_array: array '{array_name}' already exists"
2247 )));
2248 }
2249 let payload = DataArrayPayload {
2250 dtype: meta.dtype.clone(),
2251 shape: meta.shape.clone(),
2252 values: vec![0.0; meta.shape.iter().copied().product()],
2253 };
2254 let (payload_path, chunk_index_path) =
2255 write_array_payload_async(root, array_name, &payload, &meta.chunk_shape).await?;
2256 meta.data_path = make_rel_data_path(root, &payload_path)?;
2257 meta.chunk_index_path = Some(make_rel_data_path(root, &chunk_index_path)?);
2258 manifest.arrays.insert(array_name.to_string(), meta);
2259 Ok(())
2260}
2261
2262async fn resize_array_in_manifest(
2263 root: &std::path::Path,
2264 manifest: &mut DataManifest,
2265 array_name: &str,
2266 shape: Vec<usize>,
2267) -> BuiltinResult<()> {
2268 let meta = manifest
2269 .arrays
2270 .get_mut(array_name)
2271 .ok_or_else(|| data_error(format!("array '{array_name}' not found")))?;
2272 meta.shape = shape.clone();
2273 let payload = DataArrayPayload {
2274 dtype: meta.dtype.clone(),
2275 shape: shape.clone(),
2276 values: vec![0.0; shape.iter().copied().product()],
2277 };
2278 let (payload_path, chunk_index_path) =
2279 write_array_payload_async(root, array_name, &payload, &meta.chunk_shape).await?;
2280 meta.data_path = make_rel_data_path(root, &payload_path)?;
2281 meta.chunk_index_path = Some(make_rel_data_path(root, &chunk_index_path)?);
2282 Ok(())
2283}
2284
2285async fn fill_array_in_manifest(
2286 root: &std::path::Path,
2287 manifest: &mut DataManifest,
2288 array_name: &str,
2289 slice_spec: Option<&Value>,
2290 value: &Value,
2291) -> BuiltinResult<()> {
2292 let meta: DataArrayMeta = manifest
2293 .arrays
2294 .get(array_name)
2295 .cloned()
2296 .ok_or_else(|| data_error(format!("array '{array_name}' not found")))?;
2297 let scalar = scalar_to_f64(value)?;
2298 let payload = read_array_payload_async(root, &meta).await?;
2299 let next_payload = if let Some(slice_spec) = slice_spec {
2300 let ranges = parse_slice_spec(slice_spec, &payload.shape)?;
2301 let target_shape: Vec<usize> = ranges
2302 .iter()
2303 .map(|r| r.end.saturating_sub(r.start))
2304 .collect();
2305 let rhs = Value::Tensor(
2306 Tensor::new(
2307 vec![scalar; target_shape.iter().copied().product()],
2308 target_shape,
2309 )
2310 .map_err(|err| data_error(format!("DataTransaction.fill: {err}")))?,
2311 );
2312 write_slice_payload(&payload, slice_spec, &rhs)?
2313 } else {
2314 DataArrayPayload {
2315 dtype: payload.dtype,
2316 shape: payload.shape.clone(),
2317 values: vec![scalar; payload.shape.iter().copied().product()],
2318 }
2319 };
2320 let (payload_path, chunk_index_path) =
2321 write_array_payload_async(root, array_name, &next_payload, &meta.chunk_shape).await?;
2322 if let Some(updated) = manifest.arrays.get_mut(array_name) {
2323 updated.shape = next_payload.shape.clone();
2324 updated.data_path = make_rel_data_path(root, &payload_path)?;
2325 updated.chunk_index_path = Some(make_rel_data_path(root, &chunk_index_path)?);
2326 }
2327 Ok(())
2328}
2329
2330async fn delete_array_in_manifest_async(
2331 root: &std::path::Path,
2332 manifest: &mut DataManifest,
2333 array_name: &str,
2334) -> BuiltinResult<()> {
2335 let removed = manifest.arrays.remove(array_name);
2336 if removed.is_none() {
2337 return Err(data_error(format!(
2338 "DataTransaction.delete_array: array '{array_name}' not found"
2339 )));
2340 }
2341 let array_dir = root.join("arrays").join(array_name);
2342 if runmat_filesystem::metadata_async(&array_dir).await.is_ok() {
2343 runmat_filesystem::remove_dir_all_async(&array_dir)
2344 .await
2345 .map_err(|err| {
2346 data_error(format!(
2347 "DataTransaction.delete_array: failed to remove '{}': {err}",
2348 array_dir.display()
2349 ))
2350 })?;
2351 }
2352 Ok(())
2353}
2354
2355fn parse_array_meta(array_name: &str, meta: &Value) -> BuiltinResult<DataArrayMeta> {
2356 let Value::Struct(meta_struct) = meta else {
2357 return Err(data_error(
2358 "DataTransaction.create_array: meta must be a struct",
2359 ));
2360 };
2361 let dtype = meta_struct
2362 .fields
2363 .get("dtype")
2364 .map(|v| parse_string(v, "DataTransaction.create_array dtype"))
2365 .transpose()?
2366 .unwrap_or_else(|| "f64".to_string());
2367 let shape = meta_struct
2368 .fields
2369 .get("shape")
2370 .map(parse_shape_from_value)
2371 .transpose()?
2372 .unwrap_or_else(|| vec![0, 0]);
2373 let chunk_shape = meta_struct
2374 .fields
2375 .get("chunk")
2376 .map(parse_shape_from_value)
2377 .transpose()?
2378 .unwrap_or_else(|| default_chunk_shape(&shape));
2379 let codec = meta_struct
2380 .fields
2381 .get("codec")
2382 .map(|v| parse_string(v, "DataTransaction.create_array codec"))
2383 .transpose()?
2384 .unwrap_or_else(|| "zstd".to_string());
2385 Ok(DataArrayMeta {
2386 dtype,
2387 shape,
2388 chunk_shape,
2389 order: "column_major".to_string(),
2390 codec,
2391 chunk_index_path: Some(format!("arrays/{array_name}/chunks/index.json")),
2392 data_path: format!("arrays/{array_name}/data.f64.json"),
2393 })
2394}
2395
2396fn default_chunk_shape(shape: &[usize]) -> Vec<usize> {
2397 if shape.is_empty() {
2398 return vec![1024];
2399 }
2400 let mut out = shape.to_vec();
2401 if out.len() == 1 {
2402 out[0] = out[0].clamp(1, 65_536);
2403 return out;
2404 }
2405 out[0] = out[0].clamp(1, 256);
2406 out[1] = out[1].clamp(1, 256);
2407 for dim in out.iter_mut().skip(2) {
2408 *dim = (*dim).clamp(1, 8);
2409 }
2410 out
2411}
2412
2413#[async_recursion::async_recursion(?Send)]
2414async fn copy_dir_recursive(src: &PathBuf, dst: &PathBuf) -> BuiltinResult<()> {
2415 let metadata = runmat_filesystem::metadata_async(src)
2416 .await
2417 .map_err(|err| data_error(format!("failed to stat '{}': {err}", src.display())))?;
2418 if !metadata.is_dir() {
2419 return Err(data_error(format!(
2420 "expected dataset directory at '{}'",
2421 src.display()
2422 )));
2423 }
2424 runmat_filesystem::create_dir_all_async(dst)
2425 .await
2426 .map_err(|err| data_error(format!("failed to create '{}': {err}", dst.display())))?;
2427 for entry in runmat_filesystem::read_dir_async(src)
2428 .await
2429 .map_err(|err| data_error(format!("failed to read '{}': {err}", src.display())))?
2430 {
2431 let entry_src = entry.path().to_path_buf();
2432 let entry_dst = dst.join(entry.file_name());
2433 if entry.is_dir() {
2434 copy_dir_recursive(&entry_src, &entry_dst).await?;
2435 continue;
2436 }
2437 copy_file(&entry_src, &entry_dst).await?;
2438 }
2439 Ok(())
2440}
2441
2442fn parse_shape_from_value(value: &Value) -> BuiltinResult<Vec<usize>> {
2443 match value {
2444 Value::Tensor(t) => {
2445 let mut out = Vec::with_capacity(t.data.len());
2446 for v in &t.data {
2447 if !v.is_finite() || *v < 0.0 {
2448 return Err(data_error(
2449 "shape dimensions must be non-negative finite numbers",
2450 ));
2451 }
2452 out.push(*v as usize);
2453 }
2454 Ok(out)
2455 }
2456 Value::Num(v) => {
2457 if !v.is_finite() || *v < 0.0 {
2458 return Err(data_error(
2459 "shape dimensions must be non-negative finite numbers",
2460 ));
2461 }
2462 Ok(vec![*v as usize])
2463 }
2464 Value::Int(v) => {
2465 let n = v.to_i64();
2466 if n < 0 {
2467 return Err(data_error("shape dimensions must be non-negative"));
2468 }
2469 Ok(vec![n as usize])
2470 }
2471 _ => Err(data_error("shape must be a numeric vector")),
2472 }
2473}
2474
2475fn scalar_to_f64(value: &Value) -> BuiltinResult<f64> {
2476 match value {
2477 Value::Num(v) => Ok(*v),
2478 Value::Int(v) => Ok(v.to_i64() as f64),
2479 _ => Err(data_error("expected numeric scalar")),
2480 }
2481}
2482
2483async fn write_array_full_async(
2484 dataset_path: &str,
2485 array_name: &str,
2486 slice_spec: Option<&Value>,
2487 value: &Value,
2488) -> BuiltinResult<()> {
2489 let root = dataset_root(dataset_path);
2490 let mut manifest = read_manifest_async(&root).await?;
2491 apply_write_to_manifest_async(&root, &mut manifest, array_name, slice_spec, value).await?;
2492 manifest.updated_at = now_rfc3339();
2493 manifest.txn_sequence = manifest.txn_sequence.saturating_add(1);
2494 write_manifest_async(&root, &manifest).await
2495}
2496
2497async fn apply_write_to_manifest_async(
2498 root: &std::path::Path,
2499 manifest: &mut DataManifest,
2500 array_name: &str,
2501 slice_spec: Option<&Value>,
2502 value: &Value,
2503) -> BuiltinResult<()> {
2504 let meta: DataArrayMeta = manifest
2505 .arrays
2506 .get(array_name)
2507 .cloned()
2508 .ok_or_else(|| data_error(format!("array '{array_name}' not found")))?;
2509
2510 if let Some(slice_spec) = slice_spec {
2511 if apply_slice_write_chunked_async(root, manifest, array_name, &meta, slice_spec, value)
2512 .await?
2513 {
2514 return Ok(());
2515 }
2516 }
2517
2518 let payload = read_array_payload_async(root, &meta).await?;
2519 let mut next_payload = payload.clone();
2520 if let Some(slice_spec) = slice_spec {
2521 next_payload = write_slice_payload(&payload, slice_spec, value)?;
2522 } else {
2523 let (shape, values) = value_to_tensor_shape_values(value)?;
2524 next_payload.shape = shape;
2525 next_payload.values = values;
2526 }
2527
2528 let (payload_path, chunk_index_path) =
2529 write_array_payload_async(root, array_name, &next_payload, &meta.chunk_shape).await?;
2530 if let Some(updated) = manifest.arrays.get_mut(array_name) {
2531 updated.shape = next_payload.shape.clone();
2532 updated.data_path = make_rel_data_path(root, &payload_path)?;
2533 updated.chunk_index_path = Some(make_rel_data_path(root, &chunk_index_path)?);
2534 }
2535 Ok(())
2536}
2537
2538async fn apply_slice_write_chunked_async(
2539 root: &std::path::Path,
2540 manifest: &mut DataManifest,
2541 array_name: &str,
2542 meta: &DataArrayMeta,
2543 slice_spec: &Value,
2544 value: &Value,
2545) -> BuiltinResult<bool> {
2546 let Some(index_rel_path) = &meta.chunk_index_path else {
2547 return Ok(false);
2548 };
2549 let index_path = root.join(index_rel_path);
2550 if runmat_filesystem::metadata_async(&index_path)
2551 .await
2552 .is_err()
2553 {
2554 return Ok(false);
2555 }
2556 let ranges = parse_slice_spec(slice_spec, &meta.shape)?;
2557 let rhs_shape: Vec<usize> = ranges
2558 .iter()
2559 .map(|r| r.end.saturating_sub(r.start))
2560 .collect();
2561 let (actual_rhs_shape, rhs_values) = value_to_tensor_shape_values(value)?;
2562 if actual_rhs_shape != rhs_shape {
2563 return Err(data_error(format!(
2564 "SHAPE_MISMATCH: rhs shape {:?} must match target slice shape {:?}",
2565 actual_rhs_shape, rhs_shape
2566 )));
2567 }
2568
2569 let index_bytes = runmat_filesystem::read_async(&index_path)
2570 .await
2571 .map_err(|err| {
2572 data_error(format!(
2573 "failed to read chunk index '{}': {err}",
2574 index_path.display()
2575 ))
2576 })?;
2577 let mut chunk_index: DataChunkIndex = serde_json::from_slice(&index_bytes).map_err(|err| {
2578 data_error(format!(
2579 "failed to parse chunk index '{}': {err}",
2580 index_path.display()
2581 ))
2582 })?;
2583
2584 let mut pos_by_key = HashMap::new();
2585 for (idx, entry) in chunk_index.chunks.iter().enumerate() {
2586 pos_by_key.insert(entry.key.clone(), idx);
2587 }
2588 let touched = touched_chunk_coords(&ranges, &meta.chunk_shape, &meta.shape);
2589 let mut upload_batch = Vec::<(DataChunkDescriptor, Vec<u8>)>::new();
2590
2591 for coords in touched {
2592 let key = chunk_key(&coords);
2593 let chunk_start = chunk_start_for_coords(&coords, &meta.chunk_shape);
2594 let chunk_extent = chunk_extent_for_start(&chunk_start, &meta.chunk_shape, &meta.shape);
2595 let intersection = chunk_intersection(&ranges, &chunk_start, &chunk_extent);
2596 if intersection.is_empty() {
2597 continue;
2598 }
2599
2600 let (entry_index, existed, mut entry, mut chunk_payload) = load_or_init_chunk(
2601 root,
2602 array_name,
2603 &key,
2604 &coords,
2605 &chunk_extent,
2606 &pos_by_key,
2607 &chunk_index,
2608 )
2609 .await?;
2610
2611 let mut local = vec![0usize; intersection.len()];
2612 let intersection_shape: Vec<usize> = intersection
2613 .iter()
2614 .map(|r| r.end.saturating_sub(r.start))
2615 .collect();
2616 loop {
2617 let mut global = Vec::with_capacity(intersection.len());
2618 for dim in 0..intersection.len() {
2619 global.push(intersection[dim].start + local[dim]);
2620 }
2621 let rhs_index: Vec<usize> = global
2622 .iter()
2623 .enumerate()
2624 .map(|(dim, g)| g.saturating_sub(ranges[dim].start))
2625 .collect();
2626 let chunk_index_local: Vec<usize> = global
2627 .iter()
2628 .enumerate()
2629 .map(|(dim, g)| g.saturating_sub(chunk_start[dim]))
2630 .collect();
2631 let rhs_linear = linear_index_column_major(&rhs_index, &rhs_shape)?;
2632 let chunk_linear = linear_index_column_major(&chunk_index_local, &chunk_extent)?;
2633 chunk_payload.values[chunk_linear] = rhs_values[rhs_linear];
2634 if !advance_index(&mut local, &intersection_shape) {
2635 break;
2636 }
2637 }
2638
2639 let chunk_bytes = serde_json::to_vec(&chunk_payload)
2640 .map_err(|err| data_error(format!("failed to encode chunk payload: {err}")))?;
2641 let chunk_path = root.join(&entry.data_path);
2642 runmat_filesystem::write_async(&chunk_path, &chunk_bytes)
2643 .await
2644 .map_err(|err| {
2645 data_error(format!(
2646 "failed to write chunk payload '{}': {err}",
2647 chunk_path.display()
2648 ))
2649 })?;
2650
2651 entry.coords = coords.clone();
2652 entry.shape = chunk_extent.clone();
2653 entry.bytes_raw = chunk_bytes.len() as u64;
2654 entry.bytes_stored = chunk_bytes.len() as u64;
2655 entry.hash = sha256_hex(&chunk_bytes);
2656 if existed {
2657 chunk_index.chunks[entry_index] = entry.clone();
2658 } else {
2659 chunk_index.chunks.push(entry.clone());
2660 pos_by_key.insert(key.clone(), chunk_index.chunks.len() - 1);
2661 }
2662 upload_batch.push((
2663 DataChunkDescriptor {
2664 key: key.clone(),
2665 object_id: entry.object_id.clone(),
2666 hash: entry.hash.clone(),
2667 bytes_raw: entry.bytes_raw,
2668 bytes_stored: entry.bytes_stored,
2669 },
2670 chunk_bytes,
2671 ));
2672 }
2673
2674 maybe_upload_chunk_batch_async(root, array_name, upload_batch).await?;
2675 tracing::info!(
2676 target: "runmat.data",
2677 dataset = %root.display(),
2678 array = array_name,
2679 touched_chunks = chunk_index.chunks.len(),
2680 "chunked slice write committed"
2681 );
2682 let index_write = serde_json::to_vec(&chunk_index)
2683 .map_err(|err| data_error(format!("failed to encode chunk index json: {err}")))?;
2684 runmat_filesystem::write_async(&index_path, &index_write)
2685 .await
2686 .map_err(|err| {
2687 data_error(format!(
2688 "failed to write chunk index '{}': {err}",
2689 index_path.display()
2690 ))
2691 })?;
2692
2693 if let Some(updated) = manifest.arrays.get_mut(array_name) {
2694 updated.shape = meta.shape.clone();
2695 updated.chunk_index_path = Some(index_rel_path.clone());
2696 }
2697 Ok(true)
2698}
2699
2700fn value_to_tensor_shape_values(value: &Value) -> BuiltinResult<(Vec<usize>, Vec<f64>)> {
2701 match value {
2702 Value::Tensor(t) => Ok((t.shape.clone(), t.data.clone())),
2703 Value::Num(n) => Ok((vec![1, 1], vec![*n])),
2704 Value::Int(i) => Ok((vec![1, 1], vec![i.to_i64() as f64])),
2705 _ => Err(data_error(
2706 "DataArray.write supports tensor or numeric scalar values",
2707 )),
2708 }
2709}
2710
2711#[derive(Clone, Copy, Debug)]
2712struct DimRange {
2713 start: usize,
2714 end: usize,
2715}
2716
2717fn read_slice_payload(
2718 payload: &DataArrayPayload,
2719 slice_spec: &Value,
2720) -> BuiltinResult<DataArrayPayload> {
2721 let ranges = parse_slice_spec(slice_spec, &payload.shape)?;
2722 let out_shape: Vec<usize> = ranges
2723 .iter()
2724 .map(|r| r.end.saturating_sub(r.start))
2725 .collect();
2726 let mut out_values = Vec::new();
2727 let mut out_index = vec![0usize; out_shape.len()];
2728 loop {
2729 let source_index: Vec<usize> = out_index
2730 .iter()
2731 .enumerate()
2732 .map(|(dim, idx)| ranges[dim].start + *idx)
2733 .collect();
2734 let linear = linear_index_column_major(&source_index, &payload.shape)?;
2735 out_values.push(payload.values[linear]);
2736
2737 if !advance_index(&mut out_index, &out_shape) {
2738 break;
2739 }
2740 }
2741 Ok(DataArrayPayload {
2742 dtype: payload.dtype.clone(),
2743 shape: out_shape,
2744 values: out_values,
2745 })
2746}
2747
2748fn write_slice_payload(
2749 payload: &DataArrayPayload,
2750 slice_spec: &Value,
2751 rhs: &Value,
2752) -> BuiltinResult<DataArrayPayload> {
2753 let ranges = parse_slice_spec(slice_spec, &payload.shape)?;
2754 let target_shape: Vec<usize> = ranges
2755 .iter()
2756 .map(|r| r.end.saturating_sub(r.start))
2757 .collect();
2758 let (rhs_shape, rhs_values) = value_to_tensor_shape_values(rhs)?;
2759 if rhs_shape != target_shape {
2760 return Err(data_error(format!(
2761 "SHAPE_MISMATCH: rhs shape {:?} must match target slice shape {:?}",
2762 rhs_shape, target_shape
2763 )));
2764 }
2765
2766 let mut next = payload.values.clone();
2767 let mut rhs_index = vec![0usize; target_shape.len()];
2768 let mut rhs_linear = 0usize;
2769 loop {
2770 let target_index: Vec<usize> = rhs_index
2771 .iter()
2772 .enumerate()
2773 .map(|(dim, idx)| ranges[dim].start + *idx)
2774 .collect();
2775 let target_linear = linear_index_column_major(&target_index, &payload.shape)?;
2776 next[target_linear] = rhs_values[rhs_linear];
2777 rhs_linear += 1;
2778
2779 if !advance_index(&mut rhs_index, &target_shape) {
2780 break;
2781 }
2782 }
2783
2784 Ok(DataArrayPayload {
2785 dtype: payload.dtype.clone(),
2786 shape: payload.shape.clone(),
2787 values: next,
2788 })
2789}
2790
2791fn parse_slice_spec(slice_spec: &Value, shape: &[usize]) -> BuiltinResult<Vec<DimRange>> {
2792 match slice_spec {
2793 Value::Cell(cell) => {
2794 if cell.data.is_empty() {
2795 return Err(data_error("INVALID_SLICE: empty slice specification"));
2796 }
2797 let mut ranges = Vec::with_capacity(shape.len());
2798 for (dim, extent) in shape.iter().enumerate() {
2799 if let Some(item) = cell.data.get(dim).map(|v| &**v) {
2800 ranges.push(parse_dim_range(item, *extent)?);
2801 } else {
2802 ranges.push(DimRange {
2803 start: 0,
2804 end: *extent,
2805 });
2806 }
2807 }
2808 Ok(ranges)
2809 }
2810 Value::String(s) if s == ":" => Ok(shape
2811 .iter()
2812 .map(|extent| DimRange {
2813 start: 0,
2814 end: *extent,
2815 })
2816 .collect()),
2817 _ => Err(data_error(
2818 "INVALID_SLICE: slice must be a cell spec like {1:10, :} or ':'",
2819 )),
2820 }
2821}
2822
2823fn parse_dim_range(value: &Value, extent: usize) -> BuiltinResult<DimRange> {
2824 if extent == 0 {
2825 return Ok(DimRange { start: 0, end: 0 });
2826 }
2827 match value {
2828 Value::String(s) if s == ":" => Ok(DimRange {
2829 start: 0,
2830 end: extent,
2831 }),
2832 Value::Num(n) => {
2833 let idx = (*n as isize) - 1;
2834 if idx < 0 || idx as usize >= extent {
2835 return Err(data_error("INVALID_SLICE: index out of bounds"));
2836 }
2837 Ok(DimRange {
2838 start: idx as usize,
2839 end: idx as usize + 1,
2840 })
2841 }
2842 Value::Int(i) => {
2843 let idx = i.to_i64() - 1;
2844 if idx < 0 || idx as usize >= extent {
2845 return Err(data_error("INVALID_SLICE: index out of bounds"));
2846 }
2847 Ok(DimRange {
2848 start: idx as usize,
2849 end: idx as usize + 1,
2850 })
2851 }
2852 Value::Tensor(t) if t.data.len() == 2 => {
2853 let start = (t.data[0] as isize) - 1;
2854 let end_inclusive = (t.data[1] as isize) - 1;
2855 if start < 0 || end_inclusive < start || end_inclusive as usize >= extent {
2856 return Err(data_error("INVALID_SLICE: range out of bounds"));
2857 }
2858 Ok(DimRange {
2859 start: start as usize,
2860 end: end_inclusive as usize + 1,
2861 })
2862 }
2863 _ => Err(data_error(
2864 "INVALID_SLICE: dimension must be ':', scalar index, or [start end] range",
2865 )),
2866 }
2867}
2868
2869fn linear_index_column_major(index: &[usize], shape: &[usize]) -> BuiltinResult<usize> {
2870 if index.len() != shape.len() {
2871 return Err(data_error("INVALID_SLICE: rank mismatch"));
2872 }
2873 let mut stride = 1usize;
2874 let mut linear = 0usize;
2875 for (idx, extent) in index.iter().zip(shape.iter()) {
2876 if *idx >= *extent {
2877 return Err(data_error("INVALID_SLICE: index out of bounds"));
2878 }
2879 linear += idx * stride;
2880 stride = stride.saturating_mul(*extent);
2881 }
2882 Ok(linear)
2883}
2884
2885fn advance_index(index: &mut [usize], shape: &[usize]) -> bool {
2886 if shape.is_empty() {
2887 return false;
2888 }
2889 for dim in 0..shape.len() {
2890 index[dim] += 1;
2891 if index[dim] < shape[dim] {
2892 return true;
2893 }
2894 index[dim] = 0;
2895 }
2896 false
2897}
2898
2899fn chunk_key(coords: &[usize]) -> String {
2900 coords
2901 .iter()
2902 .map(|v| v.to_string())
2903 .collect::<Vec<_>>()
2904 .join(".")
2905}
2906
2907fn chunk_start_for_coords(coords: &[usize], chunk_shape: &[usize]) -> Vec<usize> {
2908 coords
2909 .iter()
2910 .enumerate()
2911 .map(|(dim, coord)| coord * chunk_shape.get(dim).copied().unwrap_or(1).max(1))
2912 .collect()
2913}
2914
2915fn chunk_extent_for_start(start: &[usize], chunk_shape: &[usize], shape: &[usize]) -> Vec<usize> {
2916 start
2917 .iter()
2918 .enumerate()
2919 .map(|(dim, start)| {
2920 let chunk = chunk_shape.get(dim).copied().unwrap_or(1).max(1);
2921 let end = (*start + chunk).min(shape[dim]);
2922 end.saturating_sub(*start)
2923 })
2924 .collect()
2925}
2926
2927fn chunk_intersection(
2928 ranges: &[DimRange],
2929 chunk_start: &[usize],
2930 chunk_extent: &[usize],
2931) -> Vec<DimRange> {
2932 let mut out = Vec::with_capacity(ranges.len());
2933 for dim in 0..ranges.len() {
2934 let c_start = chunk_start[dim];
2935 let c_end = c_start + chunk_extent[dim];
2936 let start = ranges[dim].start.max(c_start);
2937 let end = ranges[dim].end.min(c_end);
2938 if start >= end {
2939 return Vec::new();
2940 }
2941 out.push(DimRange { start, end });
2942 }
2943 out
2944}
2945
2946fn touched_chunk_coords(
2947 ranges: &[DimRange],
2948 chunk_shape: &[usize],
2949 shape: &[usize],
2950) -> Vec<Vec<usize>> {
2951 let mut span = Vec::with_capacity(ranges.len());
2952 let mut begin = Vec::with_capacity(ranges.len());
2953 for dim in 0..ranges.len() {
2954 if shape[dim] == 0 {
2955 return Vec::new();
2956 }
2957 let chunk = chunk_shape.get(dim).copied().unwrap_or(1).max(1);
2958 let first = ranges[dim].start / chunk;
2959 let last = (ranges[dim].end.saturating_sub(1)) / chunk;
2960 begin.push(first);
2961 span.push(last.saturating_sub(first) + 1);
2962 }
2963 let mut local = vec![0usize; span.len()];
2964 let mut out = Vec::new();
2965 loop {
2966 out.push(
2967 local
2968 .iter()
2969 .enumerate()
2970 .map(|(dim, v)| begin[dim] + *v)
2971 .collect::<Vec<_>>(),
2972 );
2973 if !advance_index(&mut local, &span) {
2974 break;
2975 }
2976 }
2977 out
2978}
2979
2980async fn maybe_upload_chunk_batch_async(
2981 root: &std::path::Path,
2982 array_name: &str,
2983 batch: Vec<(DataChunkDescriptor, Vec<u8>)>,
2984) -> BuiltinResult<()> {
2985 if batch.is_empty() {
2986 return Ok(());
2987 }
2988 let request = DataChunkUploadRequest {
2989 dataset_path: root.to_string_lossy().to_string(),
2990 array: array_name.to_string(),
2991 chunks: batch.iter().map(|(d, _)| d.clone()).collect(),
2992 };
2993 let targets = match runmat_filesystem::data_chunk_upload_targets_async(&request).await {
2994 Ok(targets) => targets,
2995 Err(err) if err.kind() == std::io::ErrorKind::Unsupported => return Ok(()),
2996 Err(err) => {
2997 return Err(data_error(format!(
2998 "failed to request data chunk upload targets: {err}"
2999 )))
3000 }
3001 };
3002 for (descriptor, bytes) in batch {
3003 let target = targets
3004 .iter()
3005 .find(|t| t.key == descriptor.key)
3006 .ok_or_else(|| {
3007 data_error(format!(
3008 "missing upload target for chunk '{}'",
3009 descriptor.key
3010 ))
3011 })?;
3012 runmat_filesystem::data_upload_chunk_async(target, &bytes)
3013 .await
3014 .map_err(|err| {
3015 data_error(format!(
3016 "failed to upload chunk '{}': {err}",
3017 descriptor.key
3018 ))
3019 })?;
3020 tracing::info!(
3021 target: "runmat.data",
3022 dataset = %root.display(),
3023 array = array_name,
3024 chunk_key = descriptor.key,
3025 bytes = bytes.len(),
3026 "chunk upload completed"
3027 );
3028 }
3029 Ok(())
3030}
3031
3032fn chunk_rel_path(array_name: &str, object_id: &str) -> String {
3033 format!("arrays/{array_name}/chunks/{object_id}.json")
3034}
3035
3036async fn load_or_init_chunk(
3037 root: &std::path::Path,
3038 array_name: &str,
3039 key: &str,
3040 coords: &[usize],
3041 chunk_extent: &[usize],
3042 pos_by_key: &HashMap<String, usize>,
3043 chunk_index: &DataChunkIndex,
3044) -> BuiltinResult<(usize, bool, DataChunkIndexEntry, DataArrayPayload)> {
3045 if let Some(index) = pos_by_key.get(key).copied() {
3046 let entry = chunk_index
3047 .chunks
3048 .get(index)
3049 .cloned()
3050 .ok_or_else(|| data_error(format!("chunk index missing key '{key}'")))?;
3051 let bytes = runmat_filesystem::read_async(root.join(&entry.data_path))
3052 .await
3053 .map_err(|err| {
3054 data_error(format!(
3055 "failed to read chunk payload '{}': {err}",
3056 entry.data_path
3057 ))
3058 })?;
3059 let payload: DataArrayPayload = serde_json::from_slice(&bytes).map_err(|err| {
3060 data_error(format!(
3061 "failed to parse chunk payload '{}': {err}",
3062 entry.data_path
3063 ))
3064 })?;
3065 return Ok((index, true, entry, payload));
3066 }
3067
3068 let object_id = format!("obj_{}", key.replace('.', "_"));
3069 let entry = DataChunkIndexEntry {
3070 key: key.to_string(),
3071 object_id: object_id.clone(),
3072 hash: String::new(),
3073 bytes_raw: 0,
3074 bytes_stored: 0,
3075 coords: coords.to_vec(),
3076 shape: chunk_extent.to_vec(),
3077 data_path: chunk_rel_path(array_name, &object_id),
3078 };
3079 let payload = DataArrayPayload {
3080 dtype: "f64".to_string(),
3081 shape: chunk_extent.to_vec(),
3082 values: vec![0.0; chunk_extent.iter().copied().product()],
3083 };
3084 Ok((chunk_index.chunks.len(), false, entry, payload))
3085}
3086
3087fn attrs_to_struct(attrs: &BTreeMap<String, serde_json::Value>) -> Value {
3088 let mut out = StructValue::new();
3089 for (k, v) in attrs {
3090 out.fields.insert(k.clone(), json_to_value(v));
3091 }
3092 Value::Struct(out)
3093}
3094
3095fn value_to_json(value: &Value) -> serde_json::Value {
3096 match value {
3097 Value::String(s) => serde_json::Value::String(s.clone()),
3098 Value::CharArray(ca) => serde_json::Value::String(ca.to_string()),
3099 Value::Num(n) => serde_json::json!(n),
3100 Value::Int(i) => serde_json::json!(i.to_i64()),
3101 Value::Bool(b) => serde_json::json!(b),
3102 _ => serde_json::Value::String(format!("{value:?}")),
3103 }
3104}
3105
3106fn json_to_value(value: &serde_json::Value) -> Value {
3107 match value {
3108 serde_json::Value::Bool(b) => Value::Bool(*b),
3109 serde_json::Value::Number(n) => Value::Num(n.as_f64().unwrap_or_default()),
3110 serde_json::Value::String(s) => Value::String(s.clone()),
3111 serde_json::Value::Array(arr) => {
3112 let vals = arr.iter().map(json_to_value).collect::<Vec<_>>();
3113 crate::make_cell(vals.clone(), 1, vals.len())
3114 .unwrap_or_else(|_| Value::String("<invalid-array>".to_string()))
3115 }
3116 serde_json::Value::Object(map) => {
3117 let mut s = StructValue::new();
3118 for (k, v) in map {
3119 s.fields.insert(k.clone(), json_to_value(v));
3120 }
3121 Value::Struct(s)
3122 }
3123 serde_json::Value::Null => Value::String("".to_string()),
3124 }
3125}
3126
3127#[cfg(test)]
3128mod tests {
3129 use super::*;
3130 use crate::dispatcher::call_builtin;
3131 use async_trait::async_trait;
3132 use axum::extract::{Query, State};
3133 use axum::http::{HeaderMap, StatusCode};
3134 use axum::routing::{post, put};
3135 use axum::{Json, Router};
3136 use runmat_builtins::CellArray;
3137 use runmat_filesystem::data_contract::{
3138 DataChunkUploadRequest, DataChunkUploadTarget, DataManifestDescriptor, DataManifestRequest,
3139 };
3140 use runmat_filesystem::{
3141 DirEntry, FileHandle, FsMetadata, FsProvider, NativeFsProvider, OpenFlags,
3142 };
3143 use serde::Deserialize;
3144 use std::path::Path;
3145 use std::sync::{Arc, Mutex, MutexGuard, OnceLock};
3146 use tokio::runtime::Runtime;
3147 use tokio::sync::oneshot;
3148
3149 fn serial_test_guard() -> MutexGuard<'static, ()> {
3150 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
3151 LOCK.get_or_init(|| Mutex::new(()))
3152 .lock()
3153 .expect("data test serial lock poisoned")
3154 }
3155
3156 #[test]
3157 fn io_data_descriptors_cover_constructor_and_transaction_surface() {
3158 let data_labels: Vec<&str> = DATA_CREATE_DESCRIPTOR
3159 .signatures
3160 .iter()
3161 .map(|sig| sig.label)
3162 .collect();
3163 assert!(data_labels.contains(&"ds = data.create(path, schema, Name, Value, ...)"));
3164
3165 let read_labels: Vec<&str> = DATAARRAY_READ_DESCRIPTOR
3166 .signatures
3167 .iter()
3168 .map(|sig| sig.label)
3169 .collect();
3170 assert!(read_labels.contains(&"X = DataArray.read(arr, sliceSpec)"));
3171
3172 let write_labels: Vec<&str> = DATAARRAY_WRITE_DESCRIPTOR
3173 .signatures
3174 .iter()
3175 .map(|sig| sig.label)
3176 .collect();
3177 assert!(write_labels.contains(&"tf = DataArray.write(arr, values)"));
3178 assert!(write_labels.contains(&"tf = DataArray.write(arr, sliceSpec, values)"));
3179
3180 let tx_labels: Vec<&str> = DATATX_COMMIT_DESCRIPTOR
3181 .signatures
3182 .iter()
3183 .map(|sig| sig.label)
3184 .collect();
3185 assert!(tx_labels.contains(&"tf = DataTransaction.commit(tx, Name, Value, ...)"));
3186 }
3187
3188 #[derive(Default)]
3189 struct CountingDataUploadProvider {
3190 inner: NativeFsProvider,
3191 uploaded_keys: Arc<Mutex<Vec<String>>>,
3192 }
3193
3194 struct HttpDataUploadProvider {
3195 inner: NativeFsProvider,
3196 base_url: String,
3197 client: reqwest::blocking::Client,
3198 }
3199
3200 impl HttpDataUploadProvider {
3201 fn new(base_url: String) -> Self {
3202 Self {
3203 inner: NativeFsProvider,
3204 base_url,
3205 client: reqwest::blocking::Client::new(),
3206 }
3207 }
3208 }
3209
3210 #[async_trait(?Send)]
3211 impl FsProvider for HttpDataUploadProvider {
3212 fn open(&self, path: &Path, flags: &OpenFlags) -> std::io::Result<Box<dyn FileHandle>> {
3213 self.inner.open(path, flags)
3214 }
3215
3216 async fn read(&self, path: &Path) -> std::io::Result<Vec<u8>> {
3217 self.inner.read(path).await
3218 }
3219
3220 async fn write(&self, path: &Path, data: &[u8]) -> std::io::Result<()> {
3221 self.inner.write(path, data).await
3222 }
3223
3224 async fn remove_file(&self, path: &Path) -> std::io::Result<()> {
3225 self.inner.remove_file(path).await
3226 }
3227
3228 async fn metadata(&self, path: &Path) -> std::io::Result<FsMetadata> {
3229 self.inner.metadata(path).await
3230 }
3231
3232 async fn symlink_metadata(&self, path: &Path) -> std::io::Result<FsMetadata> {
3233 self.inner.symlink_metadata(path).await
3234 }
3235
3236 async fn read_dir(&self, path: &Path) -> std::io::Result<Vec<DirEntry>> {
3237 self.inner.read_dir(path).await
3238 }
3239
3240 async fn canonicalize(&self, path: &Path) -> std::io::Result<std::path::PathBuf> {
3241 self.inner.canonicalize(path).await
3242 }
3243
3244 async fn create_dir(&self, path: &Path) -> std::io::Result<()> {
3245 self.inner.create_dir(path).await
3246 }
3247
3248 async fn create_dir_all(&self, path: &Path) -> std::io::Result<()> {
3249 self.inner.create_dir_all(path).await
3250 }
3251
3252 async fn remove_dir(&self, path: &Path) -> std::io::Result<()> {
3253 self.inner.remove_dir(path).await
3254 }
3255
3256 async fn remove_dir_all(&self, path: &Path) -> std::io::Result<()> {
3257 self.inner.remove_dir_all(path).await
3258 }
3259
3260 async fn rename(&self, from: &Path, to: &Path) -> std::io::Result<()> {
3261 self.inner.rename(from, to).await
3262 }
3263
3264 async fn set_readonly(&self, path: &Path, readonly: bool) -> std::io::Result<()> {
3265 self.inner.set_readonly(path, readonly).await
3266 }
3267
3268 async fn data_manifest_descriptor(
3269 &self,
3270 request: &DataManifestRequest,
3271 ) -> std::io::Result<DataManifestDescriptor> {
3272 self.inner.data_manifest_descriptor(request).await
3273 }
3274
3275 async fn data_chunk_upload_targets(
3276 &self,
3277 request: &DataChunkUploadRequest,
3278 ) -> std::io::Result<Vec<DataChunkUploadTarget>> {
3279 #[derive(Deserialize)]
3280 struct UploadTargetsResponse {
3281 targets: Vec<DataChunkUploadTarget>,
3282 }
3283 let url = format!("{}/data/chunks/upload-targets", self.base_url);
3284 let response = self
3285 .client
3286 .post(url)
3287 .json(request)
3288 .send()
3289 .map_err(|err| std::io::Error::other(err.to_string()))?;
3290 if !response.status().is_success() {
3291 return Err(std::io::Error::other(format!(
3292 "upload targets request failed: {}",
3293 response.status()
3294 )));
3295 }
3296 let parsed: UploadTargetsResponse = response
3297 .json()
3298 .map_err(|err| std::io::Error::other(err.to_string()))?;
3299 Ok(parsed.targets)
3300 }
3301
3302 async fn data_upload_chunk(
3303 &self,
3304 target: &DataChunkUploadTarget,
3305 data: &[u8],
3306 ) -> std::io::Result<()> {
3307 let upload_url = if let Some(key) = target.upload_url.strip_prefix("upload://") {
3308 format!("{}/upload?key={}", self.base_url, key)
3309 } else {
3310 target.upload_url.clone()
3311 };
3312 let method = reqwest::Method::from_bytes(target.method.as_bytes())
3313 .map_err(|err| std::io::Error::other(err.to_string()))?;
3314 let mut request = self.client.request(method, &upload_url);
3315 for (k, v) in &target.headers {
3316 request = request.header(k, v);
3317 }
3318 let response = request
3319 .body(data.to_vec())
3320 .send()
3321 .map_err(|err| std::io::Error::other(err.to_string()))?;
3322 if !response.status().is_success() {
3323 return Err(std::io::Error::other(format!(
3324 "chunk upload failed: {}",
3325 response.status()
3326 )));
3327 }
3328 Ok(())
3329 }
3330 }
3331
3332 #[derive(Clone, Default)]
3333 struct UploadHarness {
3334 uploads: Arc<Mutex<Vec<String>>>,
3335 }
3336
3337 #[derive(Deserialize)]
3338 struct UploadChunkQuery {
3339 key: String,
3340 }
3341
3342 async fn upload_targets_handler(
3343 Json(req): Json<DataChunkUploadRequest>,
3344 ) -> Result<Json<serde_json::Value>, StatusCode> {
3345 let targets = req
3346 .chunks
3347 .iter()
3348 .map(|chunk| {
3349 serde_json::json!({
3350 "key": chunk.key,
3351 "method": "PUT",
3352 "upload_url": format!("upload://{}", chunk.key),
3353 "headers": {
3354 "x-runmat-hash": chunk.hash,
3355 }
3356 })
3357 })
3358 .collect::<Vec<_>>();
3359 Ok(Json(serde_json::json!({ "targets": targets })))
3360 }
3361
3362 async fn upload_handler(
3363 State(harness): State<UploadHarness>,
3364 Query(query): Query<UploadChunkQuery>,
3365 headers: HeaderMap,
3366 body: axum::body::Bytes,
3367 ) -> Result<(), StatusCode> {
3368 if body.is_empty() {
3369 return Err(StatusCode::BAD_REQUEST);
3370 }
3371 if headers.get("x-runmat-hash").is_none() {
3372 return Err(StatusCode::BAD_REQUEST);
3373 }
3374 let mut guard = harness.uploads.lock().expect("uploads lock poisoned");
3375 guard.push(query.key);
3376 Ok(())
3377 }
3378
3379 fn spawn_upload_server() -> (
3380 String,
3381 Arc<Mutex<Vec<String>>>,
3382 Runtime,
3383 oneshot::Sender<()>,
3384 ) {
3385 let harness = UploadHarness::default();
3386 let uploads = Arc::clone(&harness.uploads);
3387 let runtime = Runtime::new().expect("tokio runtime");
3388 let (addr, shutdown_tx) = runtime.block_on(async move {
3389 let listener = tokio::net::TcpListener::bind((std::net::Ipv4Addr::LOCALHOST, 0))
3390 .await
3391 .expect("bind upload server");
3392 let addr = listener.local_addr().expect("local addr");
3393 let app = Router::new()
3394 .route("/data/chunks/upload-targets", post(upload_targets_handler))
3395 .route("/upload", put(upload_handler))
3396 .with_state(harness);
3397 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
3398 let server = axum::serve(listener, app).with_graceful_shutdown(async {
3399 let _ = shutdown_rx.await;
3400 });
3401 tokio::spawn(async move {
3402 let _ = server.await;
3403 });
3404 (addr, shutdown_tx)
3405 });
3406 (format!("http://{}", addr), uploads, runtime, shutdown_tx)
3407 }
3408
3409 impl CountingDataUploadProvider {
3410 fn uploaded_keys(&self) -> Arc<Mutex<Vec<String>>> {
3411 Arc::clone(&self.uploaded_keys)
3412 }
3413 }
3414
3415 #[async_trait(?Send)]
3416 impl FsProvider for CountingDataUploadProvider {
3417 fn open(&self, path: &Path, flags: &OpenFlags) -> std::io::Result<Box<dyn FileHandle>> {
3418 self.inner.open(path, flags)
3419 }
3420
3421 async fn read(&self, path: &Path) -> std::io::Result<Vec<u8>> {
3422 self.inner.read(path).await
3423 }
3424
3425 async fn write(&self, path: &Path, data: &[u8]) -> std::io::Result<()> {
3426 self.inner.write(path, data).await
3427 }
3428
3429 async fn remove_file(&self, path: &Path) -> std::io::Result<()> {
3430 self.inner.remove_file(path).await
3431 }
3432
3433 async fn metadata(&self, path: &Path) -> std::io::Result<FsMetadata> {
3434 self.inner.metadata(path).await
3435 }
3436
3437 async fn symlink_metadata(&self, path: &Path) -> std::io::Result<FsMetadata> {
3438 self.inner.symlink_metadata(path).await
3439 }
3440
3441 async fn read_dir(&self, path: &Path) -> std::io::Result<Vec<DirEntry>> {
3442 self.inner.read_dir(path).await
3443 }
3444
3445 async fn canonicalize(&self, path: &Path) -> std::io::Result<std::path::PathBuf> {
3446 self.inner.canonicalize(path).await
3447 }
3448
3449 async fn create_dir(&self, path: &Path) -> std::io::Result<()> {
3450 self.inner.create_dir(path).await
3451 }
3452
3453 async fn create_dir_all(&self, path: &Path) -> std::io::Result<()> {
3454 self.inner.create_dir_all(path).await
3455 }
3456
3457 async fn remove_dir(&self, path: &Path) -> std::io::Result<()> {
3458 self.inner.remove_dir(path).await
3459 }
3460
3461 async fn remove_dir_all(&self, path: &Path) -> std::io::Result<()> {
3462 self.inner.remove_dir_all(path).await
3463 }
3464
3465 async fn rename(&self, from: &Path, to: &Path) -> std::io::Result<()> {
3466 self.inner.rename(from, to).await
3467 }
3468
3469 async fn set_readonly(&self, path: &Path, readonly: bool) -> std::io::Result<()> {
3470 self.inner.set_readonly(path, readonly).await
3471 }
3472
3473 async fn data_manifest_descriptor(
3474 &self,
3475 request: &DataManifestRequest,
3476 ) -> std::io::Result<DataManifestDescriptor> {
3477 self.inner.data_manifest_descriptor(request).await
3478 }
3479
3480 async fn data_chunk_upload_targets(
3481 &self,
3482 request: &DataChunkUploadRequest,
3483 ) -> std::io::Result<Vec<DataChunkUploadTarget>> {
3484 Ok(request
3485 .chunks
3486 .iter()
3487 .map(|chunk| DataChunkUploadTarget {
3488 key: chunk.key.clone(),
3489 method: "PUT".to_string(),
3490 upload_url: format!("count://{}", chunk.object_id),
3491 headers: std::collections::HashMap::new(),
3492 })
3493 .collect())
3494 }
3495
3496 async fn data_upload_chunk(
3497 &self,
3498 target: &DataChunkUploadTarget,
3499 _data: &[u8],
3500 ) -> std::io::Result<()> {
3501 let mut guard = match self.uploaded_keys.lock() {
3502 Ok(guard) => guard,
3503 Err(poisoned) => poisoned.into_inner(),
3504 };
3505 guard.push(target.key.clone());
3506 Ok(())
3507 }
3508 }
3509
3510 #[test]
3511 fn create_open_write_read_dataset() {
3512 let _serial = serial_test_guard();
3513 let dir = tempfile::tempdir().expect("tempdir");
3514 let path = dir.path().join("sample.data").to_string_lossy().to_string();
3515
3516 let mut array_meta = StructValue::new();
3517 array_meta
3518 .fields
3519 .insert("dtype".to_string(), Value::String("f64".to_string()));
3520 array_meta.fields.insert(
3521 "shape".to_string(),
3522 Value::Tensor(Tensor::new(vec![2.0, 2.0], vec![1, 2]).expect("shape tensor")),
3523 );
3524 let mut arrays = StructValue::new();
3525 arrays
3526 .fields
3527 .insert("temperature".to_string(), Value::Struct(array_meta));
3528 let mut schema = StructValue::new();
3529 schema
3530 .fields
3531 .insert("arrays".to_string(), Value::Struct(arrays));
3532
3533 let ds = call_builtin(
3534 "data.create",
3535 &[
3536 Value::String(path.clone()),
3537 Value::Struct(schema),
3538 Value::Cell(runmat_builtins::CellArray::new(vec![], 1, 0).expect("cell")),
3539 ],
3540 )
3541 .expect("create dataset");
3542
3543 let arr = call_builtin(
3544 "Dataset.array",
3545 &[ds, Value::String("temperature".to_string())],
3546 )
3547 .expect("dataset array");
3548 let write_tensor = Tensor::new(vec![1.0, 2.0, 3.0, 4.0], vec![2, 2]).expect("write tensor");
3549 call_builtin(
3550 "DataArray.write",
3551 &[arr.clone(), Value::Tensor(write_tensor)],
3552 )
3553 .expect("write array");
3554
3555 let read_back = call_builtin("DataArray.read", &[arr]).expect("read array");
3556 let Value::Tensor(t) = read_back else {
3557 panic!("expected tensor");
3558 };
3559 assert_eq!(t.shape, vec![2, 2]);
3560 assert_eq!(t.data, vec![1.0, 2.0, 3.0, 4.0]);
3561 }
3562
3563 #[test]
3564 fn write_and_read_slice_payload() {
3565 let _serial = serial_test_guard();
3566 let dir = tempfile::tempdir().expect("tempdir");
3567 let path = dir.path().join("slice.data").to_string_lossy().to_string();
3568
3569 let mut array_meta = StructValue::new();
3570 array_meta
3571 .fields
3572 .insert("dtype".to_string(), Value::String("f64".to_string()));
3573 array_meta.fields.insert(
3574 "shape".to_string(),
3575 Value::Tensor(Tensor::new(vec![3.0, 3.0], vec![1, 2]).expect("shape tensor")),
3576 );
3577 let mut arrays = StructValue::new();
3578 arrays
3579 .fields
3580 .insert("temperature".to_string(), Value::Struct(array_meta));
3581 let mut schema = StructValue::new();
3582 schema
3583 .fields
3584 .insert("arrays".to_string(), Value::Struct(arrays));
3585
3586 let ds = call_builtin(
3587 "data.create",
3588 &[
3589 Value::String(path.clone()),
3590 Value::Struct(schema),
3591 Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
3592 ],
3593 )
3594 .expect("create dataset");
3595 let arr = call_builtin(
3596 "Dataset.array",
3597 &[ds, Value::String("temperature".to_string())],
3598 )
3599 .expect("dataset array");
3600
3601 let slice = Value::Cell(
3602 CellArray::new(
3603 vec![
3604 Value::Tensor(Tensor::new(vec![1.0, 2.0], vec![1, 2]).expect("range")),
3605 Value::String(":".to_string()),
3606 ],
3607 1,
3608 2,
3609 )
3610 .expect("slice cell"),
3611 );
3612 let rhs = Value::Tensor(
3613 Tensor::new(vec![10.0, 11.0, 12.0, 13.0, 14.0, 15.0], vec![2, 3]).expect("rhs"),
3614 );
3615 call_builtin("DataArray.write", &[arr.clone(), slice.clone(), rhs]).expect("slice write");
3616
3617 let read_back = call_builtin("DataArray.read", &[arr.clone(), slice]).expect("slice read");
3618 let Value::Tensor(t) = read_back else {
3619 panic!("expected tensor");
3620 };
3621 assert_eq!(t.shape, vec![2, 3]);
3622 assert_eq!(t.data, vec![10.0, 11.0, 12.0, 13.0, 14.0, 15.0]);
3623 }
3624
3625 #[test]
3626 fn slice_write_updates_only_touched_chunks() {
3627 let _serial = serial_test_guard();
3628 let dir = tempfile::tempdir().expect("tempdir");
3629 let path = dir
3630 .path()
3631 .join("chunked.data")
3632 .to_string_lossy()
3633 .to_string();
3634
3635 let mut array_meta = StructValue::new();
3636 array_meta
3637 .fields
3638 .insert("dtype".to_string(), Value::String("f64".to_string()));
3639 array_meta.fields.insert(
3640 "shape".to_string(),
3641 Value::Tensor(Tensor::new(vec![4.0, 4.0], vec![1, 2]).expect("shape tensor")),
3642 );
3643 array_meta.fields.insert(
3644 "chunk".to_string(),
3645 Value::Tensor(Tensor::new(vec![2.0, 2.0], vec![1, 2]).expect("chunk tensor")),
3646 );
3647 let mut arrays = StructValue::new();
3648 arrays
3649 .fields
3650 .insert("temperature".to_string(), Value::Struct(array_meta));
3651 let mut schema = StructValue::new();
3652 schema
3653 .fields
3654 .insert("arrays".to_string(), Value::Struct(arrays));
3655
3656 let ds = call_builtin(
3657 "data.create",
3658 &[
3659 Value::String(path.clone()),
3660 Value::Struct(schema),
3661 Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
3662 ],
3663 )
3664 .expect("create dataset");
3665 let arr = call_builtin(
3666 "Dataset.array",
3667 &[ds, Value::String("temperature".to_string())],
3668 )
3669 .expect("dataset array");
3670
3671 let full = Value::Tensor(
3672 Tensor::new((1..=16).map(|v| v as f64).collect(), vec![4, 4]).expect("full tensor"),
3673 );
3674 call_builtin("DataArray.write", &[arr.clone(), full]).expect("initial write");
3675
3676 let root = std::path::PathBuf::from(&path);
3677 let untouched_path = root.join("arrays/temperature/chunks/obj_1_1.json");
3678 let touched_path = root.join("arrays/temperature/chunks/obj_0_0.json");
3679 let untouched_before =
3680 futures::executor::block_on(runmat_filesystem::read_async(&untouched_path))
3681 .expect("read untouched before");
3682 let touched_before =
3683 futures::executor::block_on(runmat_filesystem::read_async(&touched_path))
3684 .expect("read touched before");
3685
3686 let slice = Value::Cell(
3687 CellArray::new(
3688 vec![
3689 Value::Tensor(Tensor::new(vec![1.0, 2.0], vec![1, 2]).expect("range")),
3690 Value::Tensor(Tensor::new(vec![1.0, 2.0], vec![1, 2]).expect("range")),
3691 ],
3692 1,
3693 2,
3694 )
3695 .expect("slice cell"),
3696 );
3697 let rhs =
3698 Value::Tensor(Tensor::new(vec![99.0, 98.0, 97.0, 96.0], vec![2, 2]).expect("rhs"));
3699 call_builtin("DataArray.write", &[arr.clone(), slice, rhs]).expect("slice write");
3700
3701 let untouched_after =
3702 futures::executor::block_on(runmat_filesystem::read_async(&untouched_path))
3703 .expect("read untouched after");
3704 let touched_after =
3705 futures::executor::block_on(runmat_filesystem::read_async(&touched_path))
3706 .expect("read touched after");
3707 assert_eq!(untouched_before, untouched_after);
3708 assert_ne!(touched_before, touched_after);
3709 }
3710
3711 #[test]
3712 fn slice_write_uploads_only_touched_chunk_targets() {
3713 let _serial = serial_test_guard();
3714 let provider = Arc::new(CountingDataUploadProvider::default());
3715 let uploaded = provider.uploaded_keys();
3716 let _guard = runmat_filesystem::replace_provider(provider);
3717
3718 let dir = tempfile::tempdir().expect("tempdir");
3719 let path = dir
3720 .path()
3721 .join("remote-chunked.data")
3722 .to_string_lossy()
3723 .to_string();
3724
3725 let mut array_meta = StructValue::new();
3726 array_meta
3727 .fields
3728 .insert("dtype".to_string(), Value::String("f64".to_string()));
3729 array_meta.fields.insert(
3730 "shape".to_string(),
3731 Value::Tensor(Tensor::new(vec![4.0, 4.0], vec![1, 2]).expect("shape tensor")),
3732 );
3733 array_meta.fields.insert(
3734 "chunk".to_string(),
3735 Value::Tensor(Tensor::new(vec![2.0, 2.0], vec![1, 2]).expect("chunk tensor")),
3736 );
3737 let mut arrays = StructValue::new();
3738 arrays
3739 .fields
3740 .insert("temperature".to_string(), Value::Struct(array_meta));
3741 let mut schema = StructValue::new();
3742 schema
3743 .fields
3744 .insert("arrays".to_string(), Value::Struct(arrays));
3745
3746 let ds = call_builtin(
3747 "data.create",
3748 &[
3749 Value::String(path.clone()),
3750 Value::Struct(schema),
3751 Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
3752 ],
3753 )
3754 .expect("create dataset");
3755 let arr = call_builtin(
3756 "Dataset.array",
3757 &[ds, Value::String("temperature".to_string())],
3758 )
3759 .expect("dataset array");
3760
3761 call_builtin(
3762 "DataArray.write",
3763 &[
3764 arr.clone(),
3765 Value::Tensor(
3766 Tensor::new((1..=16).map(|v| v as f64).collect(), vec![4, 4])
3767 .expect("full tensor"),
3768 ),
3769 ],
3770 )
3771 .expect("initial write");
3772
3773 let manifest =
3774 futures::executor::block_on(crate::data::read_manifest_async(&dataset_root(&path)))
3775 .expect("manifest after initial write");
3776 let meta = manifest
3777 .arrays
3778 .get("temperature")
3779 .expect("temperature meta");
3780 let chunk_index_path =
3781 dataset_root(&path).join(meta.chunk_index_path.clone().expect("chunk index path"));
3782 assert!(
3783 futures::executor::block_on(runmat_filesystem::metadata_async(&chunk_index_path))
3784 .is_ok()
3785 );
3786
3787 {
3788 let mut keys = uploaded.lock().expect("uploaded keys lock");
3789 keys.clear();
3790 }
3791
3792 let slice = Value::Cell(
3793 CellArray::new(
3794 vec![
3795 Value::Tensor(Tensor::new(vec![1.0, 2.0], vec![1, 2]).expect("range")),
3796 Value::Tensor(Tensor::new(vec![1.0, 2.0], vec![1, 2]).expect("range")),
3797 ],
3798 1,
3799 2,
3800 )
3801 .expect("slice cell"),
3802 );
3803 let rhs = Value::Tensor(Tensor::new(vec![9.0, 8.0, 7.0, 6.0], vec![2, 2]).expect("rhs"));
3804 call_builtin("DataArray.write", &[arr, slice, rhs]).expect("slice write");
3805
3806 let keys = uploaded.lock().expect("uploaded keys lock");
3807 assert_eq!(keys.as_slice(), ["0.0".to_string()].as_slice());
3808 }
3809
3810 #[test]
3811 fn slice_write_uploads_expected_cross_boundary_chunk_targets() {
3812 let _serial = serial_test_guard();
3813 let provider = Arc::new(CountingDataUploadProvider::default());
3814 let uploaded = provider.uploaded_keys();
3815 let _guard = runmat_filesystem::replace_provider(provider);
3816
3817 let dir = tempfile::tempdir().expect("tempdir");
3818 let path = dir
3819 .path()
3820 .join("remote-chunked-boundary.data")
3821 .to_string_lossy()
3822 .to_string();
3823
3824 let mut array_meta = StructValue::new();
3825 array_meta
3826 .fields
3827 .insert("dtype".to_string(), Value::String("f64".to_string()));
3828 array_meta.fields.insert(
3829 "shape".to_string(),
3830 Value::Tensor(Tensor::new(vec![4.0, 4.0], vec![1, 2]).expect("shape tensor")),
3831 );
3832 array_meta.fields.insert(
3833 "chunk".to_string(),
3834 Value::Tensor(Tensor::new(vec![2.0, 2.0], vec![1, 2]).expect("chunk tensor")),
3835 );
3836 let mut arrays = StructValue::new();
3837 arrays
3838 .fields
3839 .insert("temperature".to_string(), Value::Struct(array_meta));
3840 let mut schema = StructValue::new();
3841 schema
3842 .fields
3843 .insert("arrays".to_string(), Value::Struct(arrays));
3844
3845 let ds = call_builtin(
3846 "data.create",
3847 &[
3848 Value::String(path.clone()),
3849 Value::Struct(schema),
3850 Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
3851 ],
3852 )
3853 .expect("create dataset");
3854 let arr = call_builtin(
3855 "Dataset.array",
3856 &[ds, Value::String("temperature".to_string())],
3857 )
3858 .expect("dataset array");
3859
3860 call_builtin(
3861 "DataArray.write",
3862 &[
3863 arr.clone(),
3864 Value::Tensor(
3865 Tensor::new((1..=16).map(|v| v as f64).collect(), vec![4, 4])
3866 .expect("full tensor"),
3867 ),
3868 ],
3869 )
3870 .expect("initial write");
3871 {
3872 let mut keys = uploaded.lock().expect("uploaded keys lock");
3873 keys.clear();
3874 }
3875
3876 let slice = Value::Cell(
3877 CellArray::new(
3878 vec![
3879 Value::Tensor(Tensor::new(vec![2.0, 3.0], vec![1, 2]).expect("range")),
3880 Value::Tensor(Tensor::new(vec![2.0, 3.0], vec![1, 2]).expect("range")),
3881 ],
3882 1,
3883 2,
3884 )
3885 .expect("slice cell"),
3886 );
3887 let rhs =
3888 Value::Tensor(Tensor::new(vec![19.0, 18.0, 17.0, 16.0], vec![2, 2]).expect("rhs"));
3889 call_builtin("DataArray.write", &[arr, slice, rhs]).expect("slice write");
3890
3891 let mut keys = uploaded.lock().expect("uploaded keys lock").clone();
3892 keys.sort();
3893 keys.dedup();
3894 assert_eq!(
3895 keys.as_slice(),
3896 [
3897 "0.0".to_string(),
3898 "0.1".to_string(),
3899 "1.0".to_string(),
3900 "1.1".to_string(),
3901 ]
3902 .as_slice()
3903 );
3904 }
3905
3906 #[test]
3907 fn slice_write_hits_http_server_data_endpoints_with_expected_keys() {
3908 let _serial = serial_test_guard();
3909 let (base_url, uploads, runtime, shutdown_tx) = spawn_upload_server();
3910 let provider = Arc::new(HttpDataUploadProvider::new(base_url));
3911 let _guard = runmat_filesystem::replace_provider(provider);
3912
3913 let dir = tempfile::tempdir().expect("tempdir");
3914 let path = dir
3915 .path()
3916 .join("http-endpoint.data")
3917 .to_string_lossy()
3918 .to_string();
3919
3920 let mut array_meta = StructValue::new();
3921 array_meta
3922 .fields
3923 .insert("dtype".to_string(), Value::String("f64".to_string()));
3924 array_meta.fields.insert(
3925 "shape".to_string(),
3926 Value::Tensor(Tensor::new(vec![4.0, 4.0], vec![1, 2]).expect("shape tensor")),
3927 );
3928 array_meta.fields.insert(
3929 "chunk".to_string(),
3930 Value::Tensor(Tensor::new(vec![2.0, 2.0], vec![1, 2]).expect("chunk tensor")),
3931 );
3932 let mut arrays = StructValue::new();
3933 arrays
3934 .fields
3935 .insert("temperature".to_string(), Value::Struct(array_meta));
3936 let mut schema = StructValue::new();
3937 schema
3938 .fields
3939 .insert("arrays".to_string(), Value::Struct(arrays));
3940
3941 let ds = call_builtin(
3942 "data.create",
3943 &[
3944 Value::String(path.clone()),
3945 Value::Struct(schema),
3946 Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
3947 ],
3948 )
3949 .expect("create dataset");
3950 let arr = call_builtin(
3951 "Dataset.array",
3952 &[ds, Value::String("temperature".to_string())],
3953 )
3954 .expect("dataset array");
3955
3956 call_builtin(
3957 "DataArray.write",
3958 &[
3959 arr.clone(),
3960 Value::Tensor(
3961 Tensor::new((1..=16).map(|v| v as f64).collect(), vec![4, 4])
3962 .expect("full tensor"),
3963 ),
3964 ],
3965 )
3966 .expect("initial write");
3967
3968 {
3969 let mut keys = uploads.lock().expect("uploads lock");
3970 keys.clear();
3971 }
3972
3973 let slice = Value::Cell(
3974 CellArray::new(
3975 vec![
3976 Value::Tensor(Tensor::new(vec![2.0, 3.0], vec![1, 2]).expect("range")),
3977 Value::Tensor(Tensor::new(vec![2.0, 3.0], vec![1, 2]).expect("range")),
3978 ],
3979 1,
3980 2,
3981 )
3982 .expect("slice cell"),
3983 );
3984 let rhs =
3985 Value::Tensor(Tensor::new(vec![19.0, 18.0, 17.0, 16.0], vec![2, 2]).expect("rhs"));
3986 call_builtin("DataArray.write", &[arr, slice, rhs]).expect("slice write");
3987
3988 let mut keys = uploads.lock().expect("uploads lock").clone();
3989 keys.sort();
3990 keys.dedup();
3991 assert_eq!(
3992 keys.as_slice(),
3993 [
3994 "0.0".to_string(),
3995 "0.1".to_string(),
3996 "1.0".to_string(),
3997 "1.1".to_string(),
3998 ]
3999 .as_slice()
4000 );
4001
4002 let _ = shutdown_tx.send(());
4003 drop(runtime);
4004 }
4005
4006 #[test]
4007 fn tx_create_resize_fill_and_delete_array() {
4008 let _serial = serial_test_guard();
4009 let dir = tempfile::tempdir().expect("tempdir");
4010 let path = dir.path().join("tx-ops.data").to_string_lossy().to_string();
4011
4012 let mut arrays = StructValue::new();
4013 let mut array_meta = StructValue::new();
4014 array_meta
4015 .fields
4016 .insert("dtype".to_string(), Value::String("f64".to_string()));
4017 array_meta.fields.insert(
4018 "shape".to_string(),
4019 Value::Tensor(Tensor::new(vec![1.0, 1.0], vec![1, 2]).expect("shape tensor")),
4020 );
4021 arrays
4022 .fields
4023 .insert("base".to_string(), Value::Struct(array_meta));
4024 let mut schema = StructValue::new();
4025 schema
4026 .fields
4027 .insert("arrays".to_string(), Value::Struct(arrays));
4028
4029 let ds = call_builtin(
4030 "data.create",
4031 &[
4032 Value::String(path.clone()),
4033 Value::Struct(schema),
4034 Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
4035 ],
4036 )
4037 .expect("create dataset");
4038
4039 let tx = call_builtin("Dataset.begin", &[ds]).expect("begin tx");
4040 let mut new_meta = StructValue::new();
4041 new_meta
4042 .fields
4043 .insert("dtype".to_string(), Value::String("f64".to_string()));
4044 new_meta.fields.insert(
4045 "shape".to_string(),
4046 Value::Tensor(Tensor::new(vec![2.0, 2.0], vec![1, 2]).expect("shape tensor")),
4047 );
4048 call_builtin(
4049 "DataTransaction.create_array",
4050 &[
4051 tx.clone(),
4052 Value::String("new_array".to_string()),
4053 Value::Struct(new_meta),
4054 ],
4055 )
4056 .expect("create array in tx");
4057 call_builtin(
4058 "DataTransaction.resize",
4059 &[
4060 tx.clone(),
4061 Value::String("new_array".to_string()),
4062 Value::Tensor(Tensor::new(vec![3.0, 1.0], vec![1, 2]).expect("shape tensor")),
4063 ],
4064 )
4065 .expect("resize array in tx");
4066 call_builtin(
4067 "DataTransaction.fill",
4068 &[
4069 tx.clone(),
4070 Value::String("new_array".to_string()),
4071 Value::Num(7.0),
4072 ],
4073 )
4074 .expect("fill array in tx");
4075 call_builtin(
4076 "DataTransaction.delete_array",
4077 &[tx.clone(), Value::String("base".to_string())],
4078 )
4079 .expect("delete array in tx");
4080 call_builtin("DataTransaction.commit", &[tx]).expect("commit tx");
4081
4082 let ds = call_builtin(
4083 "data.open",
4084 &[
4085 Value::String(path),
4086 Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
4087 ],
4088 )
4089 .expect("open dataset");
4090 let has_base = call_builtin(
4091 "Dataset.has_array",
4092 &[ds.clone(), Value::String("base".to_string())],
4093 )
4094 .expect("has base");
4095 assert_eq!(has_base, Value::Bool(false));
4096 let arr = call_builtin(
4097 "Dataset.array",
4098 &[ds, Value::String("new_array".to_string())],
4099 )
4100 .expect("new array");
4101 let read_back = call_builtin("DataArray.read", &[arr]).expect("read array");
4102 let Value::Tensor(t) = read_back else {
4103 panic!("expected tensor");
4104 };
4105 assert_eq!(t.shape, vec![3, 1]);
4106 assert_eq!(t.data, vec![7.0, 7.0, 7.0]);
4107 }
4108}