1use std::collections::BTreeMap;
4use std::collections::HashMap;
5use std::path::PathBuf;
6
7use runmat_builtins::{ObjectInstance, StructValue, Tensor, Value};
8use runmat_filesystem::data_contract::{DataChunkDescriptor, DataChunkUploadRequest};
9use runmat_macros::runtime_builtin;
10
11use crate::builtins::common::spec::{
12 BroadcastSemantics, BuiltinFusionSpec, BuiltinGpuSpec, ConstantStrategy, GpuOpKind,
13 ReductionNaN, ResidencyPolicy, ShapeRequirements,
14};
15use crate::data::{
16 array_object, data_error, dataset_object, dataset_root, ensure_manifest_sequence,
17 get_object_prop, manifest_path, manifest_version_token, now_rfc3339, parse_schema,
18 parse_string, read_array_payload_async, read_manifest_async, remove_tx, sha256_hex, start_tx,
19 transaction_object, with_tx, with_tx_mut, write_array_payload_async, write_manifest_async,
20 DataArrayMeta, DataArrayPayload, DataChunkIndex, DataChunkIndexEntry, DataManifest,
21 PendingCreateArray, PendingFill, PendingResize, PendingWrite, TxnStatus,
22};
23use crate::{make_cell, BuiltinResult};
24
25#[runmat_macros::register_gpu_spec(builtin_path = "crate::builtins::io::data")]
26pub const GPU_SPEC: BuiltinGpuSpec = BuiltinGpuSpec {
27 name: "data.*",
28 op_kind: GpuOpKind::Custom("io-data"),
29 supported_precisions: &[],
30 broadcast: BroadcastSemantics::None,
31 provider_hooks: &[],
32 constant_strategy: ConstantStrategy::InlineLiteral,
33 residency: ResidencyPolicy::GatherImmediately,
34 nan_mode: ReductionNaN::Include,
35 two_pass_threshold: None,
36 workgroup_size: None,
37 accepts_nan_mode: false,
38 notes: "Dataset operations are host I/O and metadata orchestration.",
39};
40
41#[runmat_macros::register_fusion_spec(builtin_path = "crate::builtins::io::data")]
42pub const FUSION_SPEC: BuiltinFusionSpec = BuiltinFusionSpec {
43 name: "data.*",
44 shape: ShapeRequirements::Any,
45 constant_strategy: ConstantStrategy::InlineLiteral,
46 elementwise: None,
47 reduction: None,
48 emits_nan: false,
49 notes: "Data builtins are side-effecting and not fusible.",
50};
51
52#[runtime_builtin(
53 name = "data.create",
54 category = "io/data",
55 summary = "Create a typed dataset at a .data path.",
56 keywords = "data,dataset,create,persistence",
57 sink = true,
58 type_resolver(crate::builtins::io::type_resolvers::data_dataset_type),
59 builtin_path = "crate::builtins::io::data"
60)]
61async fn data_create_builtin(
62 path: Value,
63 schema: Value,
64 _rest: Vec<Value>,
65) -> BuiltinResult<Value> {
66 let path = parse_string(&path, "data.create path")?;
67 let root = dataset_root(&path);
68 let schema = parse_schema(&schema)?;
69 let now = now_rfc3339();
70 let mut arrays = BTreeMap::new();
71 for (name, mut meta) in schema.arrays {
72 let total = meta.shape.iter().copied().product::<usize>();
73 let payload = DataArrayPayload {
74 dtype: meta.dtype.clone(),
75 shape: meta.shape.clone(),
76 values: vec![0.0; total],
77 };
78 let (payload_path, chunk_index_path) =
79 write_array_payload_async(&root, &name, &payload, &meta.chunk_shape).await?;
80 meta.data_path = make_rel_data_path(&root, &payload_path)?;
81 meta.chunk_index_path = Some(make_rel_data_path(&root, &chunk_index_path)?);
82 arrays.insert(name, meta);
83 }
84
85 let manifest = DataManifest {
86 schema_version: 1,
87 format: "runmat-data".to_string(),
88 dataset_id: crate::data::new_dataset_id(),
89 name: root.file_name().map(|v| v.to_string_lossy().to_string()),
90 created_at: now.clone(),
91 updated_at: now,
92 arrays,
93 attrs: BTreeMap::new(),
94 txn_sequence: 0,
95 };
96 write_manifest_async(&root, &manifest).await?;
97 Ok(dataset_object(&path, &manifest))
98}
99
100#[runtime_builtin(
101 name = "data.open",
102 category = "io/data",
103 summary = "Open a dataset handle from a .data path.",
104 keywords = "data,dataset,open,persistence",
105 type_resolver(crate::builtins::io::type_resolvers::data_dataset_type),
106 builtin_path = "crate::builtins::io::data"
107)]
108async fn data_open_builtin(path: Value, _rest: Vec<Value>) -> BuiltinResult<Value> {
109 let path = parse_string(&path, "data.open path")?;
110 let root = dataset_root(&path);
111 let manifest = read_manifest_async(&root).await?;
112 let mut ds = dataset_object(&path, &manifest);
113 hydrate_dataset_descriptor_async(&path, &mut ds).await;
114 Ok(ds)
115}
116
117#[runtime_builtin(
118 name = "data.exists",
119 category = "io/data",
120 summary = "Check if dataset exists.",
121 keywords = "data,dataset,exists",
122 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
123 builtin_path = "crate::builtins::io::data"
124)]
125async fn data_exists_builtin(path: Value) -> BuiltinResult<Value> {
126 let path = parse_string(&path, "data.exists path")?;
127 let root = dataset_root(&path);
128 let exists = runmat_filesystem::metadata_async(manifest_path(&root))
129 .await
130 .is_ok();
131 Ok(Value::Bool(exists))
132}
133
134#[runtime_builtin(
135 name = "data.delete",
136 category = "io/data",
137 summary = "Delete a dataset path.",
138 keywords = "data,dataset,delete",
139 sink = true,
140 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
141 builtin_path = "crate::builtins::io::data"
142)]
143async fn data_delete_builtin(path: Value, _rest: Vec<Value>) -> BuiltinResult<Value> {
144 let path = parse_string(&path, "data.delete path")?;
145 let root = dataset_root(&path);
146 runmat_filesystem::remove_dir_all_async(&root)
147 .await
148 .map_err(|err| {
149 data_error(format!(
150 "data.delete: failed to remove '{}': {err}",
151 root.display()
152 ))
153 })?;
154 Ok(Value::Bool(true))
155}
156
157#[runtime_builtin(
158 name = "data.copy",
159 category = "io/data",
160 summary = "Copy dataset to new path.",
161 keywords = "data,dataset,copy",
162 sink = true,
163 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
164 builtin_path = "crate::builtins::io::data"
165)]
166async fn data_copy_builtin(
167 from_path: Value,
168 to_path: Value,
169 _rest: Vec<Value>,
170) -> BuiltinResult<Value> {
171 let from = parse_string(&from_path, "data.copy fromPath")?;
172 let to = parse_string(&to_path, "data.copy toPath")?;
173 copy_dir_recursive(&dataset_root(&from), &dataset_root(&to)).await?;
174 Ok(Value::Bool(true))
175}
176
177#[runtime_builtin(
178 name = "data.move",
179 category = "io/data",
180 summary = "Move dataset to new path.",
181 keywords = "data,dataset,move",
182 sink = true,
183 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
184 builtin_path = "crate::builtins::io::data"
185)]
186async fn data_move_builtin(
187 from_path: Value,
188 to_path: Value,
189 _rest: Vec<Value>,
190) -> BuiltinResult<Value> {
191 let from = parse_string(&from_path, "data.move fromPath")?;
192 let to = parse_string(&to_path, "data.move toPath")?;
193 runmat_filesystem::rename_async(dataset_root(&from), dataset_root(&to))
194 .await
195 .map_err(|err| {
196 data_error(format!(
197 "data.move: failed to move dataset '{from}' -> '{to}': {err}"
198 ))
199 })?;
200 Ok(Value::Bool(true))
201}
202
203#[runtime_builtin(
204 name = "data.import",
205 category = "io/data",
206 summary = "Import an existing dataset file path.",
207 keywords = "data,dataset,import",
208 sink = true,
209 type_resolver(crate::builtins::io::type_resolvers::data_dataset_type),
210 builtin_path = "crate::builtins::io::data"
211)]
212async fn data_import_builtin(
213 path: Value,
214 format: Value,
215 source_path: Value,
216 _rest: Vec<Value>,
217) -> BuiltinResult<Value> {
218 let path = parse_string(&path, "data.import path")?;
219 let format = parse_string(&format, "data.import format")?;
220 if !format.eq_ignore_ascii_case("data") {
221 return Err(data_error(
222 "data.import currently supports only format='data'",
223 ));
224 }
225 let source_path = parse_string(&source_path, "data.import sourcePath")?;
226 copy_dir_recursive(&dataset_root(&source_path), &dataset_root(&path)).await?;
227 let manifest = read_manifest_async(&dataset_root(&path)).await?;
228 Ok(dataset_object(&path, &manifest))
229}
230
231#[runtime_builtin(
232 name = "data.export",
233 category = "io/data",
234 summary = "Export dataset to target path.",
235 keywords = "data,dataset,export",
236 sink = true,
237 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
238 builtin_path = "crate::builtins::io::data"
239)]
240async fn data_export_builtin(
241 path: Value,
242 format: Value,
243 target_path: Value,
244 _rest: Vec<Value>,
245) -> BuiltinResult<Value> {
246 let path = parse_string(&path, "data.export path")?;
247 let format = parse_string(&format, "data.export format")?;
248 if !format.eq_ignore_ascii_case("data") {
249 return Err(data_error(
250 "data.export currently supports only format='data'",
251 ));
252 }
253 let target_path = parse_string(&target_path, "data.export targetPath")?;
254 copy_dir_recursive(&dataset_root(&path), &dataset_root(&target_path)).await?;
255 Ok(Value::Bool(true))
256}
257
258#[runtime_builtin(
259 name = "data.list",
260 category = "io/data",
261 summary = "List dataset paths under a prefix.",
262 keywords = "data,dataset,list",
263 type_resolver(crate::builtins::io::type_resolvers::data_cell_string_type),
264 builtin_path = "crate::builtins::io::data"
265)]
266async fn data_list_builtin(path_prefix: Value, _rest: Vec<Value>) -> BuiltinResult<Value> {
267 let prefix = parse_string(&path_prefix, "data.list prefix")?;
268 let root = PathBuf::from(prefix);
269 let entries = runmat_filesystem::read_dir_async(&root)
270 .await
271 .map_err(|err| {
272 data_error(format!(
273 "data.list: failed to read '{}': {err}",
274 root.display()
275 ))
276 })?;
277 let mut values = Vec::new();
278 for entry in entries {
279 if !entry.is_dir() {
280 continue;
281 }
282 let candidate = entry.path();
283 if candidate.extension().and_then(|s| s.to_str()) != Some("data") {
284 continue;
285 }
286 if runmat_filesystem::metadata_async(candidate.join("manifest.json"))
287 .await
288 .is_ok()
289 {
290 values.push(Value::String(candidate.to_string_lossy().to_string()));
291 }
292 }
293 let cols = values.len();
294 make_cell(values, 1, cols).map_err(data_error)
295}
296
297#[runtime_builtin(
298 name = "data.inspect",
299 category = "io/data",
300 summary = "Inspect dataset metadata and schema fields.",
301 keywords = "data,dataset,inspect,schema",
302 type_resolver(crate::builtins::io::type_resolvers::data_struct_type),
303 builtin_path = "crate::builtins::io::data"
304)]
305async fn data_inspect_builtin(path: Value) -> BuiltinResult<Value> {
306 let path = parse_string(&path, "data.inspect path")?;
307 let root = dataset_root(&path);
308 let manifest = read_manifest_async(&root).await?;
309 let mut out = StructValue::new();
310 out.fields.insert("path".to_string(), Value::String(path));
311 out.fields
312 .insert("id".to_string(), Value::String(manifest.dataset_id));
313 out.fields.insert(
314 "arrayCount".to_string(),
315 Value::Num(manifest.arrays.len() as f64),
316 );
317 out.fields
318 .insert("updatedAt".to_string(), Value::String(manifest.updated_at));
319 Ok(Value::Struct(out))
320}
321
322#[runtime_builtin(
323 name = "Dataset.path",
324 category = "io/data",
325 summary = "Return dataset path.",
326 keywords = "dataset,path",
327 type_resolver(crate::builtins::io::type_resolvers::data_string_type),
328 builtin_path = "crate::builtins::io::data"
329)]
330async fn dataset_path_builtin(base: Value) -> BuiltinResult<Value> {
331 let obj = as_object(&base, "Dataset.path")?;
332 Ok(get_object_prop(obj, "__data_path")?.clone())
333}
334
335#[runtime_builtin(
336 name = "Dataset.id",
337 category = "io/data",
338 type_resolver(crate::builtins::io::type_resolvers::data_string_type),
339 builtin_path = "crate::builtins::io::data"
340)]
341async fn dataset_id_builtin(base: Value) -> BuiltinResult<Value> {
342 let obj = as_object(&base, "Dataset.id")?;
343 Ok(get_object_prop(obj, "__data_id")?.clone())
344}
345
346#[runtime_builtin(
347 name = "Dataset.version",
348 category = "io/data",
349 type_resolver(crate::builtins::io::type_resolvers::data_string_type),
350 builtin_path = "crate::builtins::io::data"
351)]
352async fn dataset_version_builtin(base: Value) -> BuiltinResult<Value> {
353 let obj = as_object(&base, "Dataset.version")?;
354 Ok(get_object_prop(obj, "__data_version")?.clone())
355}
356
357#[runtime_builtin(
358 name = "Dataset.arrays",
359 category = "io/data",
360 type_resolver(crate::builtins::io::type_resolvers::data_cell_string_type),
361 builtin_path = "crate::builtins::io::data"
362)]
363async fn dataset_arrays_builtin(base: Value) -> BuiltinResult<Value> {
364 let path = dataset_path_from_object(&base, "Dataset.arrays")?;
365 let manifest = read_manifest_async(&dataset_root(&path)).await?;
366 let values: Vec<Value> = manifest
367 .arrays
368 .keys()
369 .map(|k| Value::String(k.clone()))
370 .collect();
371 make_cell(values.clone(), 1, values.len()).map_err(data_error)
372}
373
374#[runtime_builtin(
375 name = "Dataset.has_array",
376 category = "io/data",
377 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
378 builtin_path = "crate::builtins::io::data"
379)]
380async fn dataset_has_array_builtin(base: Value, name: Value) -> BuiltinResult<Value> {
381 let path = dataset_path_from_object(&base, "Dataset.has_array")?;
382 let name = parse_string(&name, "Dataset.has_array name")?;
383 let manifest = read_manifest_async(&dataset_root(&path)).await?;
384 Ok(Value::Bool(manifest.arrays.contains_key(&name)))
385}
386
387#[runtime_builtin(
388 name = "Dataset.array",
389 category = "io/data",
390 type_resolver(crate::builtins::io::type_resolvers::data_array_type),
391 builtin_path = "crate::builtins::io::data"
392)]
393async fn dataset_array_builtin(base: Value, name: Value) -> BuiltinResult<Value> {
394 let path = dataset_path_from_object(&base, "Dataset.array")?;
395 let name = parse_string(&name, "Dataset.array name")?;
396 let manifest = read_manifest_async(&dataset_root(&path)).await?;
397 if !manifest.arrays.contains_key(&name) {
398 return Err(data_error(format!(
399 "Dataset.array: array '{name}' not found"
400 )));
401 }
402 Ok(array_object(&path, &name))
403}
404
405#[runtime_builtin(
406 name = "Dataset.attrs",
407 category = "io/data",
408 type_resolver(crate::builtins::io::type_resolvers::data_struct_type),
409 builtin_path = "crate::builtins::io::data"
410)]
411async fn dataset_attrs_builtin(base: Value) -> BuiltinResult<Value> {
412 let path = dataset_path_from_object(&base, "Dataset.attrs")?;
413 let manifest = read_manifest_async(&dataset_root(&path)).await?;
414 Ok(attrs_to_struct(&manifest.attrs))
415}
416
417#[runtime_builtin(
418 name = "Dataset.get_attr",
419 category = "io/data",
420 type_resolver(crate::builtins::io::type_resolvers::data_unknown_type),
421 builtin_path = "crate::builtins::io::data"
422)]
423async fn dataset_get_attr_builtin(
424 base: Value,
425 key: Value,
426 rest: Vec<Value>,
427) -> BuiltinResult<Value> {
428 let path = dataset_path_from_object(&base, "Dataset.get_attr")?;
429 let key = parse_string(&key, "Dataset.get_attr key")?;
430 let manifest = read_manifest_async(&dataset_root(&path)).await?;
431 if let Some(value) = manifest.attrs.get(&key) {
432 return Ok(json_to_value(value));
433 }
434 Ok(rest.first().cloned().unwrap_or(Value::Num(0.0)))
435}
436
437#[runtime_builtin(
438 name = "Dataset.set_attr",
439 category = "io/data",
440 sink = true,
441 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
442 builtin_path = "crate::builtins::io::data"
443)]
444async fn dataset_set_attr_builtin(base: Value, key: Value, value: Value) -> BuiltinResult<Value> {
445 let path = dataset_path_from_object(&base, "Dataset.set_attr")?;
446 let key = parse_string(&key, "Dataset.set_attr key")?;
447 let root = dataset_root(&path);
448 let mut manifest = read_manifest_async(&root).await?;
449 manifest.attrs.insert(key, value_to_json(&value));
450 manifest.updated_at = now_rfc3339();
451 manifest.txn_sequence = manifest.txn_sequence.saturating_add(1);
452 write_manifest_async(&root, &manifest).await?;
453 Ok(Value::Bool(true))
454}
455
456#[runtime_builtin(
457 name = "Dataset.set_attrs",
458 category = "io/data",
459 sink = true,
460 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
461 builtin_path = "crate::builtins::io::data"
462)]
463async fn dataset_set_attrs_builtin(base: Value, attrs: Value) -> BuiltinResult<Value> {
464 let path = dataset_path_from_object(&base, "Dataset.set_attrs")?;
465 let Value::Struct(incoming) = attrs else {
466 return Err(data_error("Dataset.set_attrs: attrs must be a struct"));
467 };
468 let root = dataset_root(&path);
469 let mut manifest = read_manifest_async(&root).await?;
470 for (k, v) in incoming.fields {
471 manifest.attrs.insert(k, value_to_json(&v));
472 }
473 manifest.updated_at = now_rfc3339();
474 manifest.txn_sequence = manifest.txn_sequence.saturating_add(1);
475 write_manifest_async(&root, &manifest).await?;
476 Ok(Value::Bool(true))
477}
478
479#[runtime_builtin(
480 name = "Dataset.begin",
481 category = "io/data",
482 type_resolver(crate::builtins::io::type_resolvers::data_tx_type),
483 builtin_path = "crate::builtins::io::data"
484)]
485async fn dataset_begin_builtin(base: Value, _rest: Vec<Value>) -> BuiltinResult<Value> {
486 let path = dataset_path_from_object(&base, "Dataset.begin")?;
487 let manifest = read_manifest_async(&dataset_root(&path)).await?;
488 let tx_id = start_tx(path.clone(), manifest.txn_sequence);
489 tracing::info!(
490 target: "runmat.data",
491 dataset = path,
492 tx_id = tx_id,
493 base_sequence = manifest.txn_sequence,
494 "data transaction begin"
495 );
496 Ok(transaction_object(&path, &tx_id))
497}
498
499#[runtime_builtin(
500 name = "Dataset.snapshot",
501 category = "io/data",
502 type_resolver(crate::builtins::io::type_resolvers::data_string_type),
503 builtin_path = "crate::builtins::io::data"
504)]
505async fn dataset_snapshot_builtin(
506 base: Value,
507 label: Value,
508 _rest: Vec<Value>,
509) -> BuiltinResult<Value> {
510 let path = dataset_path_from_object(&base, "Dataset.snapshot")?;
511 let label = parse_string(&label, "Dataset.snapshot label")?;
512 let root = dataset_root(&path);
513 let snapshots = root.join(".snapshots");
514 runmat_filesystem::create_dir_all_async(&snapshots)
515 .await
516 .map_err(|err| {
517 data_error(format!(
518 "Dataset.snapshot: failed to create snapshots dir: {err}"
519 ))
520 })?;
521 let src = manifest_path(&root);
522 let dst = snapshots.join(format!("{}.manifest.json", sanitize_label(&label)));
523 copy_file(&src, &dst).await?;
524 Ok(Value::String(dst.to_string_lossy().to_string()))
525}
526
527#[runtime_builtin(
528 name = "Dataset.refresh",
529 category = "io/data",
530 type_resolver(crate::builtins::io::type_resolvers::data_dataset_type),
531 builtin_path = "crate::builtins::io::data"
532)]
533async fn dataset_refresh_builtin(base: Value) -> BuiltinResult<Value> {
534 let path = dataset_path_from_object(&base, "Dataset.refresh")?;
535 let manifest = read_manifest_async(&dataset_root(&path)).await?;
536 let mut ds = dataset_object(&path, &manifest);
537 hydrate_dataset_descriptor_async(&path, &mut ds).await;
538 Ok(ds)
539}
540
541#[runtime_builtin(
542 name = "DataArray.name",
543 category = "io/data",
544 type_resolver(crate::builtins::io::type_resolvers::data_string_type),
545 builtin_path = "crate::builtins::io::data"
546)]
547async fn data_array_name_builtin(base: Value) -> BuiltinResult<Value> {
548 let obj = as_object(&base, "DataArray.name")?;
549 Ok(get_object_prop(obj, "__array_name")?.clone())
550}
551
552#[runtime_builtin(
553 name = "DataArray.dtype",
554 category = "io/data",
555 type_resolver(crate::builtins::io::type_resolvers::data_string_type),
556 builtin_path = "crate::builtins::io::data"
557)]
558async fn data_array_dtype_builtin(base: Value) -> BuiltinResult<Value> {
559 let (path, name) = array_identity(&base, "DataArray.dtype")?;
560 let manifest = read_manifest_async(&dataset_root(&path)).await?;
561 let meta = manifest
562 .arrays
563 .get(&name)
564 .ok_or_else(|| data_error(format!("DataArray.dtype: array '{name}' not found")))?;
565 Ok(Value::String(meta.dtype.clone()))
566}
567
568#[runtime_builtin(
569 name = "DataArray.shape",
570 category = "io/data",
571 type_resolver(crate::builtins::io::type_resolvers::data_shape_tensor_type),
572 builtin_path = "crate::builtins::io::data"
573)]
574async fn data_array_shape_builtin(base: Value) -> BuiltinResult<Value> {
575 let (path, name) = array_identity(&base, "DataArray.shape")?;
576 let manifest = read_manifest_async(&dataset_root(&path)).await?;
577 let meta = manifest
578 .arrays
579 .get(&name)
580 .ok_or_else(|| data_error(format!("DataArray.shape: array '{name}' not found")))?;
581 let values = meta.shape.iter().map(|v| *v as f64).collect::<Vec<_>>();
582 let tensor = Tensor::new(values, vec![1, meta.shape.len()])
583 .map_err(|err| data_error(format!("DataArray.shape: {err}")))?;
584 Ok(Value::Tensor(tensor))
585}
586
587#[runtime_builtin(
588 name = "DataArray.rank",
589 category = "io/data",
590 type_resolver(crate::builtins::io::type_resolvers::data_int_type),
591 builtin_path = "crate::builtins::io::data"
592)]
593async fn data_array_rank_builtin(base: Value) -> BuiltinResult<Value> {
594 let (path, name) = array_identity(&base, "DataArray.rank")?;
595 let manifest = read_manifest_async(&dataset_root(&path)).await?;
596 let meta = manifest
597 .arrays
598 .get(&name)
599 .ok_or_else(|| data_error(format!("DataArray.rank: array '{name}' not found")))?;
600 Ok(Value::Num(meta.shape.len() as f64))
601}
602
603#[runtime_builtin(
604 name = "DataArray.chunk_shape",
605 category = "io/data",
606 type_resolver(crate::builtins::io::type_resolvers::data_shape_tensor_type),
607 builtin_path = "crate::builtins::io::data"
608)]
609async fn data_array_chunk_shape_builtin(base: Value) -> BuiltinResult<Value> {
610 let (path, name) = array_identity(&base, "DataArray.chunk_shape")?;
611 let manifest = read_manifest_async(&dataset_root(&path)).await?;
612 let meta = manifest
613 .arrays
614 .get(&name)
615 .ok_or_else(|| data_error(format!("DataArray.chunk_shape: array '{name}' not found")))?;
616 let values = meta
617 .chunk_shape
618 .iter()
619 .map(|v| *v as f64)
620 .collect::<Vec<_>>();
621 let tensor = Tensor::new(values, vec![1, meta.chunk_shape.len()])
622 .map_err(|err| data_error(format!("DataArray.chunk_shape: {err}")))?;
623 Ok(Value::Tensor(tensor))
624}
625
626#[runtime_builtin(
627 name = "DataArray.codec",
628 category = "io/data",
629 type_resolver(crate::builtins::io::type_resolvers::data_string_type),
630 builtin_path = "crate::builtins::io::data"
631)]
632async fn data_array_codec_builtin(base: Value) -> BuiltinResult<Value> {
633 let (path, name) = array_identity(&base, "DataArray.codec")?;
634 let manifest = read_manifest_async(&dataset_root(&path)).await?;
635 let meta = manifest
636 .arrays
637 .get(&name)
638 .ok_or_else(|| data_error(format!("DataArray.codec: array '{name}' not found")))?;
639 Ok(Value::String(meta.codec.clone()))
640}
641
642#[runtime_builtin(
643 name = "DataArray.read",
644 category = "io/data",
645 type_resolver(crate::builtins::io::type_resolvers::data_tensor_type),
646 builtin_path = "crate::builtins::io::data"
647)]
648async fn data_array_read_builtin(base: Value, rest: Vec<Value>) -> BuiltinResult<Value> {
649 let (path, name) = array_identity(&base, "DataArray.read")?;
650 let root = dataset_root(&path);
651 let manifest = read_manifest_async(&root).await?;
652 let meta = manifest
653 .arrays
654 .get(&name)
655 .ok_or_else(|| data_error(format!("DataArray.read: array '{name}' not found")))?;
656 let payload = read_array_payload_async(&root, meta).await?;
657 let sliced = if let Some(slice_spec) = rest.first() {
658 read_slice_payload(&payload, slice_spec)?
659 } else {
660 payload
661 };
662 let tensor = Tensor::new(sliced.values, sliced.shape)
663 .map_err(|err| data_error(format!("DataArray.read: {err}")))?;
664 Ok(Value::Tensor(tensor))
665}
666
667#[runtime_builtin(
668 name = "DataArray.write",
669 category = "io/data",
670 sink = true,
671 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
672 builtin_path = "crate::builtins::io::data"
673)]
674async fn data_array_write_builtin(base: Value, rest: Vec<Value>) -> BuiltinResult<Value> {
675 let (path, name) = array_identity(&base, "DataArray.write")?;
676 let (slice_spec, value) = match rest.as_slice() {
677 [v] => (None, v),
678 [slice, v] => (Some(slice), v),
679 _ => {
680 return Err(data_error(
681 "DataArray.write expects values or (sliceSpec, values) arguments",
682 ))
683 }
684 };
685 write_array_full_async(&path, &name, slice_spec, value).await?;
686 Ok(Value::Bool(true))
687}
688
689#[runtime_builtin(
690 name = "DataArray.resize",
691 category = "io/data",
692 sink = true,
693 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
694 builtin_path = "crate::builtins::io::data"
695)]
696async fn data_array_resize_builtin(
697 base: Value,
698 new_shape: Value,
699 _rest: Vec<Value>,
700) -> BuiltinResult<Value> {
701 let (path, name) = array_identity(&base, "DataArray.resize")?;
702 let shape = parse_shape_from_value(&new_shape)?;
703 let root = dataset_root(&path);
704 let mut manifest = read_manifest_async(&root).await?;
705 let meta = manifest
706 .arrays
707 .get_mut(&name)
708 .ok_or_else(|| data_error(format!("DataArray.resize: array '{name}' not found")))?;
709 meta.shape = shape.clone();
710 let payload = DataArrayPayload {
711 dtype: meta.dtype.clone(),
712 shape: shape.clone(),
713 values: vec![0.0; shape.iter().copied().product()],
714 };
715 let (payload_path, chunk_index_path) =
716 write_array_payload_async(&root, &name, &payload, &meta.chunk_shape).await?;
717 meta.data_path = make_rel_data_path(&root, &payload_path)?;
718 meta.chunk_index_path = Some(make_rel_data_path(&root, &chunk_index_path)?);
719 manifest.updated_at = now_rfc3339();
720 manifest.txn_sequence = manifest.txn_sequence.saturating_add(1);
721 write_manifest_async(&root, &manifest).await?;
722 Ok(Value::Bool(true))
723}
724
725#[runtime_builtin(
726 name = "DataArray.fill",
727 category = "io/data",
728 sink = true,
729 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
730 builtin_path = "crate::builtins::io::data"
731)]
732async fn data_array_fill_builtin(
733 base: Value,
734 value: Value,
735 _rest: Vec<Value>,
736) -> BuiltinResult<Value> {
737 let (path, name) = array_identity(&base, "DataArray.fill")?;
738 let root = dataset_root(&path);
739 let mut manifest = read_manifest_async(&root).await?;
740 let meta = manifest
741 .arrays
742 .get_mut(&name)
743 .ok_or_else(|| data_error(format!("DataArray.fill: array '{name}' not found")))?;
744 let scalar = scalar_to_f64(&value)?;
745 let payload = DataArrayPayload {
746 dtype: meta.dtype.clone(),
747 shape: meta.shape.clone(),
748 values: vec![scalar; meta.shape.iter().copied().product()],
749 };
750 let (payload_path, chunk_index_path) =
751 write_array_payload_async(&root, &name, &payload, &meta.chunk_shape).await?;
752 meta.data_path = make_rel_data_path(&root, &payload_path)?;
753 meta.chunk_index_path = Some(make_rel_data_path(&root, &chunk_index_path)?);
754 manifest.updated_at = now_rfc3339();
755 manifest.txn_sequence = manifest.txn_sequence.saturating_add(1);
756 write_manifest_async(&root, &manifest).await?;
757 Ok(Value::Bool(true))
758}
759
760#[runtime_builtin(
761 name = "DataTransaction.id",
762 category = "io/data",
763 type_resolver(crate::builtins::io::type_resolvers::data_string_type),
764 builtin_path = "crate::builtins::io::data"
765)]
766async fn data_tx_id_builtin(base: Value) -> BuiltinResult<Value> {
767 let obj = as_object(&base, "DataTransaction.id")?;
768 Ok(get_object_prop(obj, "__tx_id")?.clone())
769}
770
771#[runtime_builtin(
772 name = "DataTransaction.write",
773 category = "io/data",
774 sink = true,
775 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
776 builtin_path = "crate::builtins::io::data"
777)]
778async fn data_tx_write_builtin(
779 base: Value,
780 array_name: Value,
781 slice: Value,
782 values: Value,
783 _rest: Vec<Value>,
784) -> BuiltinResult<Value> {
785 let tx_id = tx_id_from_object(&base, "DataTransaction.write")?;
786 let array_name = parse_string(&array_name, "DataTransaction.write arrayName")?;
787 with_tx_mut(&tx_id, |tx| {
788 if tx.status != TxnStatus::Open {
789 return Err(data_error("DataTransaction.write: transaction is not open"));
790 }
791 tx.writes.push(PendingWrite {
792 array: array_name,
793 slice_spec: Some(slice),
794 value: values,
795 });
796 Ok(())
797 })?;
798 Ok(Value::Bool(true))
799}
800
801#[runtime_builtin(
802 name = "DataTransaction.set_attr",
803 category = "io/data",
804 sink = true,
805 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
806 builtin_path = "crate::builtins::io::data"
807)]
808async fn data_tx_set_attr_builtin(base: Value, key: Value, value: Value) -> BuiltinResult<Value> {
809 let tx_id = tx_id_from_object(&base, "DataTransaction.set_attr")?;
810 let key = parse_string(&key, "DataTransaction.set_attr key")?;
811 with_tx_mut(&tx_id, |tx| {
812 if tx.status != TxnStatus::Open {
813 return Err(data_error(
814 "DataTransaction.set_attr: transaction is not open",
815 ));
816 }
817 tx.attrs.insert(key, value);
818 Ok(())
819 })?;
820 Ok(Value::Bool(true))
821}
822
823#[runtime_builtin(
824 name = "DataTransaction.set_attrs",
825 category = "io/data",
826 sink = true,
827 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
828 builtin_path = "crate::builtins::io::data"
829)]
830async fn data_tx_set_attrs_builtin(base: Value, attrs: Value) -> BuiltinResult<Value> {
831 let tx_id = tx_id_from_object(&base, "DataTransaction.set_attrs")?;
832 let Value::Struct(incoming) = attrs else {
833 return Err(data_error(
834 "DataTransaction.set_attrs: attrs must be struct",
835 ));
836 };
837 with_tx_mut(&tx_id, |tx| {
838 if tx.status != TxnStatus::Open {
839 return Err(data_error(
840 "DataTransaction.set_attrs: transaction is not open",
841 ));
842 }
843 for (k, v) in incoming.fields {
844 tx.attrs.insert(k, v);
845 }
846 Ok(())
847 })?;
848 Ok(Value::Bool(true))
849}
850
851#[runtime_builtin(
852 name = "DataTransaction.resize",
853 category = "io/data",
854 sink = true,
855 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
856 builtin_path = "crate::builtins::io::data"
857)]
858async fn data_tx_resize_builtin(
859 base: Value,
860 array_name: Value,
861 new_shape: Value,
862 _rest: Vec<Value>,
863) -> BuiltinResult<Value> {
864 let tx_id = tx_id_from_object(&base, "DataTransaction.resize")?;
865 let array_name = parse_string(&array_name, "DataTransaction.resize arrayName")?;
866 let shape = parse_shape_from_value(&new_shape)?;
867 with_tx_mut(&tx_id, |tx| {
868 if tx.status != TxnStatus::Open {
869 return Err(data_error(
870 "DataTransaction.resize: transaction is not open",
871 ));
872 }
873 tx.resizes.push(PendingResize {
874 array: array_name,
875 shape,
876 });
877 Ok(())
878 })?;
879 Ok(Value::Bool(true))
880}
881
882#[runtime_builtin(
883 name = "DataTransaction.fill",
884 category = "io/data",
885 sink = true,
886 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
887 builtin_path = "crate::builtins::io::data"
888)]
889async fn data_tx_fill_builtin(
890 base: Value,
891 array_name: Value,
892 value: Value,
893 rest: Vec<Value>,
894) -> BuiltinResult<Value> {
895 let tx_id = tx_id_from_object(&base, "DataTransaction.fill")?;
896 let array_name = parse_string(&array_name, "DataTransaction.fill arrayName")?;
897 let slice_spec = rest.first().cloned();
898 with_tx_mut(&tx_id, |tx| {
899 if tx.status != TxnStatus::Open {
900 return Err(data_error("DataTransaction.fill: transaction is not open"));
901 }
902 tx.fills.push(PendingFill {
903 array: array_name,
904 slice_spec,
905 value,
906 });
907 Ok(())
908 })?;
909 Ok(Value::Bool(true))
910}
911
912#[runtime_builtin(
913 name = "DataTransaction.delete_array",
914 category = "io/data",
915 sink = true,
916 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
917 builtin_path = "crate::builtins::io::data"
918)]
919async fn data_tx_delete_array_builtin(base: Value, array_name: Value) -> BuiltinResult<Value> {
920 let tx_id = tx_id_from_object(&base, "DataTransaction.delete_array")?;
921 let array_name = parse_string(&array_name, "DataTransaction.delete_array arrayName")?;
922 with_tx_mut(&tx_id, |tx| {
923 if tx.status != TxnStatus::Open {
924 return Err(data_error(
925 "DataTransaction.delete_array: transaction is not open",
926 ));
927 }
928 tx.delete_arrays.push(array_name);
929 Ok(())
930 })?;
931 Ok(Value::Bool(true))
932}
933
934#[runtime_builtin(
935 name = "DataTransaction.create_array",
936 category = "io/data",
937 sink = true,
938 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
939 builtin_path = "crate::builtins::io::data"
940)]
941async fn data_tx_create_array_builtin(
942 base: Value,
943 array_name: Value,
944 meta: Value,
945) -> BuiltinResult<Value> {
946 let tx_id = tx_id_from_object(&base, "DataTransaction.create_array")?;
947 let array_name = parse_string(&array_name, "DataTransaction.create_array arrayName")?;
948 let meta = parse_array_meta(&array_name, &meta)?;
949 with_tx_mut(&tx_id, |tx| {
950 if tx.status != TxnStatus::Open {
951 return Err(data_error(
952 "DataTransaction.create_array: transaction is not open",
953 ));
954 }
955 tx.create_arrays.push(PendingCreateArray {
956 array: array_name,
957 meta,
958 });
959 Ok(())
960 })?;
961 Ok(Value::Bool(true))
962}
963
964#[runtime_builtin(
965 name = "DataTransaction.commit",
966 category = "io/data",
967 sink = true,
968 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
969 builtin_path = "crate::builtins::io::data"
970)]
971async fn data_tx_commit_builtin(base: Value, rest: Vec<Value>) -> BuiltinResult<Value> {
972 let tx_id = tx_id_from_object(&base, "DataTransaction.commit")?;
973 let (dataset_path, base_sequence, writes, resizes, fills, create_arrays, delete_arrays, attrs) =
974 with_tx(&tx_id, |tx| {
975 if tx.status != TxnStatus::Open {
976 return Err(data_error(
977 "DataTransaction.commit: transaction is not open",
978 ));
979 }
980 Ok((
981 tx.dataset_path.clone(),
982 tx.base_sequence,
983 tx.writes.clone(),
984 tx.resizes.clone(),
985 tx.fills.clone(),
986 tx.create_arrays.clone(),
987 tx.delete_arrays.clone(),
988 tx.attrs.clone(),
989 ))
990 })?;
991
992 let write_ops = writes.len();
993 let resize_ops = resizes.len();
994 let fill_ops = fills.len();
995 let create_ops = create_arrays.len();
996 let delete_ops = delete_arrays.len();
997 let attr_updates = attrs.len();
998
999 let root = dataset_root(&dataset_path);
1000 let mut manifest = read_manifest_async(&root).await?;
1001 ensure_manifest_sequence(base_sequence, &manifest)?;
1002 if let Some(Value::Struct(options)) = rest.first() {
1003 if let Some(expected) = options.fields.get("if_manifest") {
1004 let expected = parse_string(expected, "DataTransaction.commit if_manifest")?;
1005 let actual = manifest_version_token(&manifest);
1006 if expected != actual {
1007 tracing::warn!(
1008 target: "runmat.data",
1009 tx_id = tx_id,
1010 expected_manifest = expected,
1011 actual_manifest = actual,
1012 "data transaction manifest conflict"
1013 );
1014 return Err(data_error(
1015 "MANIFEST_CONFLICT: if_manifest precondition failed",
1016 ));
1017 }
1018 }
1019 }
1020 for create in create_arrays {
1021 create_array_in_manifest(&root, &mut manifest, &create.array, create.meta).await?;
1022 }
1023 for resize in resizes {
1024 resize_array_in_manifest(&root, &mut manifest, &resize.array, resize.shape).await?;
1025 }
1026 for fill in fills {
1027 fill_array_in_manifest(
1028 &root,
1029 &mut manifest,
1030 &fill.array,
1031 fill.slice_spec.as_ref(),
1032 &fill.value,
1033 )
1034 .await?;
1035 }
1036 for write in writes {
1037 apply_write_to_manifest_async(
1038 &root,
1039 &mut manifest,
1040 &write.array,
1041 write.slice_spec.as_ref(),
1042 &write.value,
1043 )
1044 .await?;
1045 }
1046 for array_name in delete_arrays {
1047 delete_array_in_manifest_async(&root, &mut manifest, &array_name).await?;
1048 }
1049 for (k, v) in attrs {
1050 manifest.attrs.insert(k, value_to_json(&v));
1051 }
1052 manifest.updated_at = now_rfc3339();
1053 manifest.txn_sequence = manifest.txn_sequence.saturating_add(1);
1054 write_manifest_async(&root, &manifest).await?;
1055 with_tx_mut(&tx_id, |tx| {
1056 tx.status = TxnStatus::Committed;
1057 Ok(())
1058 })?;
1059 tracing::info!(
1060 target: "runmat.data",
1061 dataset = dataset_path,
1062 tx_id = tx_id,
1063 write_ops = write_ops,
1064 resize_ops = resize_ops,
1065 fill_ops = fill_ops,
1066 create_ops = create_ops,
1067 delete_ops = delete_ops,
1068 attr_updates = attr_updates,
1069 next_sequence = manifest.txn_sequence,
1070 "data transaction commit"
1071 );
1072 remove_tx(&tx_id);
1073 Ok(Value::Bool(true))
1074}
1075
1076#[runtime_builtin(
1077 name = "commit",
1078 category = "io/data",
1079 sink = true,
1080 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1081 builtin_path = "crate::builtins::io::data"
1082)]
1083async fn data_tx_commit_alias_builtin(base: Value, rest: Vec<Value>) -> BuiltinResult<Value> {
1084 match &base {
1085 Value::Object(obj) if obj.class_name == "DataTransaction" => {
1086 data_tx_commit_builtin(base, rest).await
1087 }
1088 Value::HandleObject(handle) if handle.class_name == "DataTransaction" => {
1089 data_tx_commit_builtin(base, rest).await
1090 }
1091 _ => Err(data_error(
1092 "commit: receiver must be a DataTransaction (use tx = ds.begin())",
1093 )),
1094 }
1095}
1096
1097#[runtime_builtin(
1098 name = "DataTransaction.abort",
1099 category = "io/data",
1100 sink = true,
1101 type_resolver(crate::builtins::io::type_resolvers::data_bool_type),
1102 builtin_path = "crate::builtins::io::data"
1103)]
1104async fn data_tx_abort_builtin(base: Value) -> BuiltinResult<Value> {
1105 let tx_id = tx_id_from_object(&base, "DataTransaction.abort")?;
1106 with_tx_mut(&tx_id, |tx| {
1107 tx.status = TxnStatus::Aborted;
1108 Ok(())
1109 })?;
1110 tracing::info!(
1111 target: "runmat.data",
1112 tx_id = tx_id,
1113 "data transaction abort"
1114 );
1115 remove_tx(&tx_id);
1116 Ok(Value::Bool(true))
1117}
1118
1119#[runtime_builtin(
1120 name = "DataTransaction.status",
1121 category = "io/data",
1122 type_resolver(crate::builtins::io::type_resolvers::data_string_type),
1123 builtin_path = "crate::builtins::io::data"
1124)]
1125async fn data_tx_status_builtin(base: Value) -> BuiltinResult<Value> {
1126 let tx_id = tx_id_from_object(&base, "DataTransaction.status")?;
1127 with_tx(&tx_id, |tx| {
1128 let status = match tx.status {
1129 TxnStatus::Open => "open",
1130 TxnStatus::Committed => "committed",
1131 TxnStatus::Aborted => "aborted",
1132 };
1133 Ok(Value::String(status.to_string()))
1134 })
1135}
1136
1137fn dataset_path_from_object(base: &Value, context: &str) -> BuiltinResult<String> {
1138 let obj = as_object(base, context)?;
1139 parse_string(get_object_prop(obj, "__data_path")?, context)
1140}
1141
1142fn tx_id_from_object(base: &Value, context: &str) -> BuiltinResult<String> {
1143 let obj = as_object(base, context)?;
1144 parse_string(get_object_prop(obj, "__tx_id")?, context)
1145}
1146
1147fn array_identity(base: &Value, context: &str) -> BuiltinResult<(String, String)> {
1148 let obj = as_object(base, context)?;
1149 let path = parse_string(get_object_prop(obj, "__data_path")?, context)?;
1150 let name = parse_string(get_object_prop(obj, "__array_name")?, context)?;
1151 Ok((path, name))
1152}
1153
1154fn as_object<'a>(value: &'a Value, context: &str) -> BuiltinResult<&'a ObjectInstance> {
1155 match value {
1156 Value::Object(obj) => Ok(obj),
1157 _ => Err(data_error(format!("{context}: expected object receiver"))),
1158 }
1159}
1160
1161async fn hydrate_dataset_descriptor_async(path: &str, dataset: &mut Value) {
1162 let request = runmat_filesystem::data_contract::DataManifestRequest {
1163 path: path.to_string(),
1164 version: None,
1165 };
1166 let descriptor = match runmat_filesystem::data_manifest_descriptor_async(&request).await {
1167 Ok(descriptor) => descriptor,
1168 Err(_) => return,
1169 };
1170 let Value::Object(obj) = dataset else {
1171 return;
1172 };
1173 if !descriptor.dataset_id.is_empty() {
1174 obj.properties.insert(
1175 "__data_id".to_string(),
1176 Value::String(descriptor.dataset_id),
1177 );
1178 }
1179 obj.properties.insert(
1180 "__data_version".to_string(),
1181 Value::String(format!(
1182 "{}:{}",
1183 descriptor.updated_at, descriptor.txn_sequence
1184 )),
1185 );
1186}
1187
1188fn sanitize_label(label: &str) -> String {
1189 label
1190 .chars()
1191 .map(|ch| {
1192 if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
1193 ch
1194 } else {
1195 '_'
1196 }
1197 })
1198 .collect()
1199}
1200
1201async fn copy_file(src: &PathBuf, dst: &PathBuf) -> BuiltinResult<()> {
1202 let bytes = runmat_filesystem::read_async(src)
1203 .await
1204 .map_err(|err| data_error(format!("failed to open '{}': {err}", src.display())))?;
1205 let parent = dst.parent().ok_or_else(|| {
1206 data_error(format!(
1207 "invalid destination path '{}': missing parent",
1208 dst.display()
1209 ))
1210 })?;
1211 runmat_filesystem::create_dir_all_async(parent)
1212 .await
1213 .map_err(|err| data_error(format!("failed to create '{}': {err}", parent.display())))?;
1214 runmat_filesystem::write_async(dst, &bytes)
1215 .await
1216 .map_err(|err| {
1217 data_error(format!(
1218 "failed to copy '{}' -> '{}': {err}",
1219 src.display(),
1220 dst.display()
1221 ))
1222 })?;
1223 Ok(())
1224}
1225
1226fn make_rel_data_path(
1227 root: &std::path::Path,
1228 payload_path: &std::path::Path,
1229) -> BuiltinResult<String> {
1230 let rel = payload_path
1231 .strip_prefix(root)
1232 .map_err(|err| data_error(format!("failed to compute relative data path: {err}")))?;
1233 Ok(rel.to_string_lossy().to_string())
1234}
1235
1236async fn create_array_in_manifest(
1237 root: &std::path::Path,
1238 manifest: &mut DataManifest,
1239 array_name: &str,
1240 mut meta: DataArrayMeta,
1241) -> BuiltinResult<()> {
1242 if manifest.arrays.contains_key(array_name) {
1243 return Err(data_error(format!(
1244 "DataTransaction.create_array: array '{array_name}' already exists"
1245 )));
1246 }
1247 let payload = DataArrayPayload {
1248 dtype: meta.dtype.clone(),
1249 shape: meta.shape.clone(),
1250 values: vec![0.0; meta.shape.iter().copied().product()],
1251 };
1252 let (payload_path, chunk_index_path) =
1253 write_array_payload_async(root, array_name, &payload, &meta.chunk_shape).await?;
1254 meta.data_path = make_rel_data_path(root, &payload_path)?;
1255 meta.chunk_index_path = Some(make_rel_data_path(root, &chunk_index_path)?);
1256 manifest.arrays.insert(array_name.to_string(), meta);
1257 Ok(())
1258}
1259
1260async fn resize_array_in_manifest(
1261 root: &std::path::Path,
1262 manifest: &mut DataManifest,
1263 array_name: &str,
1264 shape: Vec<usize>,
1265) -> BuiltinResult<()> {
1266 let meta = manifest
1267 .arrays
1268 .get_mut(array_name)
1269 .ok_or_else(|| data_error(format!("array '{array_name}' not found")))?;
1270 meta.shape = shape.clone();
1271 let payload = DataArrayPayload {
1272 dtype: meta.dtype.clone(),
1273 shape: shape.clone(),
1274 values: vec![0.0; shape.iter().copied().product()],
1275 };
1276 let (payload_path, chunk_index_path) =
1277 write_array_payload_async(root, array_name, &payload, &meta.chunk_shape).await?;
1278 meta.data_path = make_rel_data_path(root, &payload_path)?;
1279 meta.chunk_index_path = Some(make_rel_data_path(root, &chunk_index_path)?);
1280 Ok(())
1281}
1282
1283async fn fill_array_in_manifest(
1284 root: &std::path::Path,
1285 manifest: &mut DataManifest,
1286 array_name: &str,
1287 slice_spec: Option<&Value>,
1288 value: &Value,
1289) -> BuiltinResult<()> {
1290 let meta: DataArrayMeta = manifest
1291 .arrays
1292 .get(array_name)
1293 .cloned()
1294 .ok_or_else(|| data_error(format!("array '{array_name}' not found")))?;
1295 let scalar = scalar_to_f64(value)?;
1296 let payload = read_array_payload_async(root, &meta).await?;
1297 let next_payload = if let Some(slice_spec) = slice_spec {
1298 let ranges = parse_slice_spec(slice_spec, &payload.shape)?;
1299 let target_shape: Vec<usize> = ranges
1300 .iter()
1301 .map(|r| r.end.saturating_sub(r.start))
1302 .collect();
1303 let rhs = Value::Tensor(
1304 Tensor::new(
1305 vec![scalar; target_shape.iter().copied().product()],
1306 target_shape,
1307 )
1308 .map_err(|err| data_error(format!("DataTransaction.fill: {err}")))?,
1309 );
1310 write_slice_payload(&payload, slice_spec, &rhs)?
1311 } else {
1312 DataArrayPayload {
1313 dtype: payload.dtype,
1314 shape: payload.shape.clone(),
1315 values: vec![scalar; payload.shape.iter().copied().product()],
1316 }
1317 };
1318 let (payload_path, chunk_index_path) =
1319 write_array_payload_async(root, array_name, &next_payload, &meta.chunk_shape).await?;
1320 if let Some(updated) = manifest.arrays.get_mut(array_name) {
1321 updated.shape = next_payload.shape.clone();
1322 updated.data_path = make_rel_data_path(root, &payload_path)?;
1323 updated.chunk_index_path = Some(make_rel_data_path(root, &chunk_index_path)?);
1324 }
1325 Ok(())
1326}
1327
1328async fn delete_array_in_manifest_async(
1329 root: &std::path::Path,
1330 manifest: &mut DataManifest,
1331 array_name: &str,
1332) -> BuiltinResult<()> {
1333 let removed = manifest.arrays.remove(array_name);
1334 if removed.is_none() {
1335 return Err(data_error(format!(
1336 "DataTransaction.delete_array: array '{array_name}' not found"
1337 )));
1338 }
1339 let array_dir = root.join("arrays").join(array_name);
1340 if runmat_filesystem::metadata_async(&array_dir).await.is_ok() {
1341 runmat_filesystem::remove_dir_all_async(&array_dir)
1342 .await
1343 .map_err(|err| {
1344 data_error(format!(
1345 "DataTransaction.delete_array: failed to remove '{}': {err}",
1346 array_dir.display()
1347 ))
1348 })?;
1349 }
1350 Ok(())
1351}
1352
1353fn parse_array_meta(array_name: &str, meta: &Value) -> BuiltinResult<DataArrayMeta> {
1354 let Value::Struct(meta_struct) = meta else {
1355 return Err(data_error(
1356 "DataTransaction.create_array: meta must be a struct",
1357 ));
1358 };
1359 let dtype = meta_struct
1360 .fields
1361 .get("dtype")
1362 .map(|v| parse_string(v, "DataTransaction.create_array dtype"))
1363 .transpose()?
1364 .unwrap_or_else(|| "f64".to_string());
1365 let shape = meta_struct
1366 .fields
1367 .get("shape")
1368 .map(parse_shape_from_value)
1369 .transpose()?
1370 .unwrap_or_else(|| vec![0, 0]);
1371 let chunk_shape = meta_struct
1372 .fields
1373 .get("chunk")
1374 .map(parse_shape_from_value)
1375 .transpose()?
1376 .unwrap_or_else(|| default_chunk_shape(&shape));
1377 let codec = meta_struct
1378 .fields
1379 .get("codec")
1380 .map(|v| parse_string(v, "DataTransaction.create_array codec"))
1381 .transpose()?
1382 .unwrap_or_else(|| "zstd".to_string());
1383 Ok(DataArrayMeta {
1384 dtype,
1385 shape,
1386 chunk_shape,
1387 order: "column_major".to_string(),
1388 codec,
1389 chunk_index_path: Some(format!("arrays/{array_name}/chunks/index.json")),
1390 data_path: format!("arrays/{array_name}/data.f64.json"),
1391 })
1392}
1393
1394fn default_chunk_shape(shape: &[usize]) -> Vec<usize> {
1395 if shape.is_empty() {
1396 return vec![1024];
1397 }
1398 let mut out = shape.to_vec();
1399 if out.len() == 1 {
1400 out[0] = out[0].clamp(1, 65_536);
1401 return out;
1402 }
1403 out[0] = out[0].clamp(1, 256);
1404 out[1] = out[1].clamp(1, 256);
1405 for dim in out.iter_mut().skip(2) {
1406 *dim = (*dim).clamp(1, 8);
1407 }
1408 out
1409}
1410
1411#[async_recursion::async_recursion(?Send)]
1412async fn copy_dir_recursive(src: &PathBuf, dst: &PathBuf) -> BuiltinResult<()> {
1413 let metadata = runmat_filesystem::metadata_async(src)
1414 .await
1415 .map_err(|err| data_error(format!("failed to stat '{}': {err}", src.display())))?;
1416 if !metadata.is_dir() {
1417 return Err(data_error(format!(
1418 "expected dataset directory at '{}'",
1419 src.display()
1420 )));
1421 }
1422 runmat_filesystem::create_dir_all_async(dst)
1423 .await
1424 .map_err(|err| data_error(format!("failed to create '{}': {err}", dst.display())))?;
1425 for entry in runmat_filesystem::read_dir_async(src)
1426 .await
1427 .map_err(|err| data_error(format!("failed to read '{}': {err}", src.display())))?
1428 {
1429 let entry_src = entry.path().to_path_buf();
1430 let entry_dst = dst.join(entry.file_name());
1431 if entry.is_dir() {
1432 copy_dir_recursive(&entry_src, &entry_dst).await?;
1433 continue;
1434 }
1435 copy_file(&entry_src, &entry_dst).await?;
1436 }
1437 Ok(())
1438}
1439
1440fn parse_shape_from_value(value: &Value) -> BuiltinResult<Vec<usize>> {
1441 match value {
1442 Value::Tensor(t) => {
1443 let mut out = Vec::with_capacity(t.data.len());
1444 for v in &t.data {
1445 if !v.is_finite() || *v < 0.0 {
1446 return Err(data_error(
1447 "shape dimensions must be non-negative finite numbers",
1448 ));
1449 }
1450 out.push(*v as usize);
1451 }
1452 Ok(out)
1453 }
1454 Value::Num(v) => {
1455 if !v.is_finite() || *v < 0.0 {
1456 return Err(data_error(
1457 "shape dimensions must be non-negative finite numbers",
1458 ));
1459 }
1460 Ok(vec![*v as usize])
1461 }
1462 Value::Int(v) => {
1463 let n = v.to_i64();
1464 if n < 0 {
1465 return Err(data_error("shape dimensions must be non-negative"));
1466 }
1467 Ok(vec![n as usize])
1468 }
1469 _ => Err(data_error("shape must be a numeric vector")),
1470 }
1471}
1472
1473fn scalar_to_f64(value: &Value) -> BuiltinResult<f64> {
1474 match value {
1475 Value::Num(v) => Ok(*v),
1476 Value::Int(v) => Ok(v.to_i64() as f64),
1477 _ => Err(data_error("expected numeric scalar")),
1478 }
1479}
1480
1481async fn write_array_full_async(
1482 dataset_path: &str,
1483 array_name: &str,
1484 slice_spec: Option<&Value>,
1485 value: &Value,
1486) -> BuiltinResult<()> {
1487 let root = dataset_root(dataset_path);
1488 let mut manifest = read_manifest_async(&root).await?;
1489 apply_write_to_manifest_async(&root, &mut manifest, array_name, slice_spec, value).await?;
1490 manifest.updated_at = now_rfc3339();
1491 manifest.txn_sequence = manifest.txn_sequence.saturating_add(1);
1492 write_manifest_async(&root, &manifest).await
1493}
1494
1495async fn apply_write_to_manifest_async(
1496 root: &std::path::Path,
1497 manifest: &mut DataManifest,
1498 array_name: &str,
1499 slice_spec: Option<&Value>,
1500 value: &Value,
1501) -> BuiltinResult<()> {
1502 let meta: DataArrayMeta = manifest
1503 .arrays
1504 .get(array_name)
1505 .cloned()
1506 .ok_or_else(|| data_error(format!("array '{array_name}' not found")))?;
1507
1508 if let Some(slice_spec) = slice_spec {
1509 if apply_slice_write_chunked_async(root, manifest, array_name, &meta, slice_spec, value)
1510 .await?
1511 {
1512 return Ok(());
1513 }
1514 }
1515
1516 let payload = read_array_payload_async(root, &meta).await?;
1517 let mut next_payload = payload.clone();
1518 if let Some(slice_spec) = slice_spec {
1519 next_payload = write_slice_payload(&payload, slice_spec, value)?;
1520 } else {
1521 let (shape, values) = value_to_tensor_shape_values(value)?;
1522 next_payload.shape = shape;
1523 next_payload.values = values;
1524 }
1525
1526 let (payload_path, chunk_index_path) =
1527 write_array_payload_async(root, array_name, &next_payload, &meta.chunk_shape).await?;
1528 if let Some(updated) = manifest.arrays.get_mut(array_name) {
1529 updated.shape = next_payload.shape.clone();
1530 updated.data_path = make_rel_data_path(root, &payload_path)?;
1531 updated.chunk_index_path = Some(make_rel_data_path(root, &chunk_index_path)?);
1532 }
1533 Ok(())
1534}
1535
1536async fn apply_slice_write_chunked_async(
1537 root: &std::path::Path,
1538 manifest: &mut DataManifest,
1539 array_name: &str,
1540 meta: &DataArrayMeta,
1541 slice_spec: &Value,
1542 value: &Value,
1543) -> BuiltinResult<bool> {
1544 let Some(index_rel_path) = &meta.chunk_index_path else {
1545 return Ok(false);
1546 };
1547 let index_path = root.join(index_rel_path);
1548 if runmat_filesystem::metadata_async(&index_path)
1549 .await
1550 .is_err()
1551 {
1552 return Ok(false);
1553 }
1554 let ranges = parse_slice_spec(slice_spec, &meta.shape)?;
1555 let rhs_shape: Vec<usize> = ranges
1556 .iter()
1557 .map(|r| r.end.saturating_sub(r.start))
1558 .collect();
1559 let (actual_rhs_shape, rhs_values) = value_to_tensor_shape_values(value)?;
1560 if actual_rhs_shape != rhs_shape {
1561 return Err(data_error(format!(
1562 "SHAPE_MISMATCH: rhs shape {:?} must match target slice shape {:?}",
1563 actual_rhs_shape, rhs_shape
1564 )));
1565 }
1566
1567 let index_bytes = runmat_filesystem::read_async(&index_path)
1568 .await
1569 .map_err(|err| {
1570 data_error(format!(
1571 "failed to read chunk index '{}': {err}",
1572 index_path.display()
1573 ))
1574 })?;
1575 let mut chunk_index: DataChunkIndex = serde_json::from_slice(&index_bytes).map_err(|err| {
1576 data_error(format!(
1577 "failed to parse chunk index '{}': {err}",
1578 index_path.display()
1579 ))
1580 })?;
1581
1582 let mut pos_by_key = HashMap::new();
1583 for (idx, entry) in chunk_index.chunks.iter().enumerate() {
1584 pos_by_key.insert(entry.key.clone(), idx);
1585 }
1586 let touched = touched_chunk_coords(&ranges, &meta.chunk_shape, &meta.shape);
1587 let mut upload_batch = Vec::<(DataChunkDescriptor, Vec<u8>)>::new();
1588
1589 for coords in touched {
1590 let key = chunk_key(&coords);
1591 let chunk_start = chunk_start_for_coords(&coords, &meta.chunk_shape);
1592 let chunk_extent = chunk_extent_for_start(&chunk_start, &meta.chunk_shape, &meta.shape);
1593 let intersection = chunk_intersection(&ranges, &chunk_start, &chunk_extent);
1594 if intersection.is_empty() {
1595 continue;
1596 }
1597
1598 let (entry_index, existed, mut entry, mut chunk_payload) = load_or_init_chunk(
1599 root,
1600 array_name,
1601 &key,
1602 &coords,
1603 &chunk_extent,
1604 &pos_by_key,
1605 &chunk_index,
1606 )
1607 .await?;
1608
1609 let mut local = vec![0usize; intersection.len()];
1610 let intersection_shape: Vec<usize> = intersection
1611 .iter()
1612 .map(|r| r.end.saturating_sub(r.start))
1613 .collect();
1614 loop {
1615 let mut global = Vec::with_capacity(intersection.len());
1616 for dim in 0..intersection.len() {
1617 global.push(intersection[dim].start + local[dim]);
1618 }
1619 let rhs_index: Vec<usize> = global
1620 .iter()
1621 .enumerate()
1622 .map(|(dim, g)| g.saturating_sub(ranges[dim].start))
1623 .collect();
1624 let chunk_index_local: Vec<usize> = global
1625 .iter()
1626 .enumerate()
1627 .map(|(dim, g)| g.saturating_sub(chunk_start[dim]))
1628 .collect();
1629 let rhs_linear = linear_index_column_major(&rhs_index, &rhs_shape)?;
1630 let chunk_linear = linear_index_column_major(&chunk_index_local, &chunk_extent)?;
1631 chunk_payload.values[chunk_linear] = rhs_values[rhs_linear];
1632 if !advance_index(&mut local, &intersection_shape) {
1633 break;
1634 }
1635 }
1636
1637 let chunk_bytes = serde_json::to_vec(&chunk_payload)
1638 .map_err(|err| data_error(format!("failed to encode chunk payload: {err}")))?;
1639 let chunk_path = root.join(&entry.data_path);
1640 runmat_filesystem::write_async(&chunk_path, &chunk_bytes)
1641 .await
1642 .map_err(|err| {
1643 data_error(format!(
1644 "failed to write chunk payload '{}': {err}",
1645 chunk_path.display()
1646 ))
1647 })?;
1648
1649 entry.coords = coords.clone();
1650 entry.shape = chunk_extent.clone();
1651 entry.bytes_raw = chunk_bytes.len() as u64;
1652 entry.bytes_stored = chunk_bytes.len() as u64;
1653 entry.hash = sha256_hex(&chunk_bytes);
1654 if existed {
1655 chunk_index.chunks[entry_index] = entry.clone();
1656 } else {
1657 chunk_index.chunks.push(entry.clone());
1658 pos_by_key.insert(key.clone(), chunk_index.chunks.len() - 1);
1659 }
1660 upload_batch.push((
1661 DataChunkDescriptor {
1662 key: key.clone(),
1663 object_id: entry.object_id.clone(),
1664 hash: entry.hash.clone(),
1665 bytes_raw: entry.bytes_raw,
1666 bytes_stored: entry.bytes_stored,
1667 },
1668 chunk_bytes,
1669 ));
1670 }
1671
1672 maybe_upload_chunk_batch_async(root, array_name, upload_batch).await?;
1673 tracing::info!(
1674 target: "runmat.data",
1675 dataset = %root.display(),
1676 array = array_name,
1677 touched_chunks = chunk_index.chunks.len(),
1678 "chunked slice write committed"
1679 );
1680 let index_write = serde_json::to_vec(&chunk_index)
1681 .map_err(|err| data_error(format!("failed to encode chunk index json: {err}")))?;
1682 runmat_filesystem::write_async(&index_path, &index_write)
1683 .await
1684 .map_err(|err| {
1685 data_error(format!(
1686 "failed to write chunk index '{}': {err}",
1687 index_path.display()
1688 ))
1689 })?;
1690
1691 if let Some(updated) = manifest.arrays.get_mut(array_name) {
1692 updated.shape = meta.shape.clone();
1693 updated.chunk_index_path = Some(index_rel_path.clone());
1694 }
1695 Ok(true)
1696}
1697
1698fn value_to_tensor_shape_values(value: &Value) -> BuiltinResult<(Vec<usize>, Vec<f64>)> {
1699 match value {
1700 Value::Tensor(t) => Ok((t.shape.clone(), t.data.clone())),
1701 Value::Num(n) => Ok((vec![1, 1], vec![*n])),
1702 Value::Int(i) => Ok((vec![1, 1], vec![i.to_i64() as f64])),
1703 _ => Err(data_error(
1704 "DataArray.write supports tensor or numeric scalar values",
1705 )),
1706 }
1707}
1708
1709#[derive(Clone, Copy, Debug)]
1710struct DimRange {
1711 start: usize,
1712 end: usize,
1713}
1714
1715fn read_slice_payload(
1716 payload: &DataArrayPayload,
1717 slice_spec: &Value,
1718) -> BuiltinResult<DataArrayPayload> {
1719 let ranges = parse_slice_spec(slice_spec, &payload.shape)?;
1720 let out_shape: Vec<usize> = ranges
1721 .iter()
1722 .map(|r| r.end.saturating_sub(r.start))
1723 .collect();
1724 let mut out_values = Vec::new();
1725 let mut out_index = vec![0usize; out_shape.len()];
1726 loop {
1727 let source_index: Vec<usize> = out_index
1728 .iter()
1729 .enumerate()
1730 .map(|(dim, idx)| ranges[dim].start + *idx)
1731 .collect();
1732 let linear = linear_index_column_major(&source_index, &payload.shape)?;
1733 out_values.push(payload.values[linear]);
1734
1735 if !advance_index(&mut out_index, &out_shape) {
1736 break;
1737 }
1738 }
1739 Ok(DataArrayPayload {
1740 dtype: payload.dtype.clone(),
1741 shape: out_shape,
1742 values: out_values,
1743 })
1744}
1745
1746fn write_slice_payload(
1747 payload: &DataArrayPayload,
1748 slice_spec: &Value,
1749 rhs: &Value,
1750) -> BuiltinResult<DataArrayPayload> {
1751 let ranges = parse_slice_spec(slice_spec, &payload.shape)?;
1752 let target_shape: Vec<usize> = ranges
1753 .iter()
1754 .map(|r| r.end.saturating_sub(r.start))
1755 .collect();
1756 let (rhs_shape, rhs_values) = value_to_tensor_shape_values(rhs)?;
1757 if rhs_shape != target_shape {
1758 return Err(data_error(format!(
1759 "SHAPE_MISMATCH: rhs shape {:?} must match target slice shape {:?}",
1760 rhs_shape, target_shape
1761 )));
1762 }
1763
1764 let mut next = payload.values.clone();
1765 let mut rhs_index = vec![0usize; target_shape.len()];
1766 let mut rhs_linear = 0usize;
1767 loop {
1768 let target_index: Vec<usize> = rhs_index
1769 .iter()
1770 .enumerate()
1771 .map(|(dim, idx)| ranges[dim].start + *idx)
1772 .collect();
1773 let target_linear = linear_index_column_major(&target_index, &payload.shape)?;
1774 next[target_linear] = rhs_values[rhs_linear];
1775 rhs_linear += 1;
1776
1777 if !advance_index(&mut rhs_index, &target_shape) {
1778 break;
1779 }
1780 }
1781
1782 Ok(DataArrayPayload {
1783 dtype: payload.dtype.clone(),
1784 shape: payload.shape.clone(),
1785 values: next,
1786 })
1787}
1788
1789fn parse_slice_spec(slice_spec: &Value, shape: &[usize]) -> BuiltinResult<Vec<DimRange>> {
1790 match slice_spec {
1791 Value::Cell(cell) => {
1792 if cell.data.is_empty() {
1793 return Err(data_error("INVALID_SLICE: empty slice specification"));
1794 }
1795 let mut ranges = Vec::with_capacity(shape.len());
1796 for (dim, extent) in shape.iter().enumerate() {
1797 if let Some(item) = cell.data.get(dim).map(|v| &**v) {
1798 ranges.push(parse_dim_range(item, *extent)?);
1799 } else {
1800 ranges.push(DimRange {
1801 start: 0,
1802 end: *extent,
1803 });
1804 }
1805 }
1806 Ok(ranges)
1807 }
1808 Value::String(s) if s == ":" => Ok(shape
1809 .iter()
1810 .map(|extent| DimRange {
1811 start: 0,
1812 end: *extent,
1813 })
1814 .collect()),
1815 _ => Err(data_error(
1816 "INVALID_SLICE: slice must be a cell spec like {1:10, :} or ':'",
1817 )),
1818 }
1819}
1820
1821fn parse_dim_range(value: &Value, extent: usize) -> BuiltinResult<DimRange> {
1822 if extent == 0 {
1823 return Ok(DimRange { start: 0, end: 0 });
1824 }
1825 match value {
1826 Value::String(s) if s == ":" => Ok(DimRange {
1827 start: 0,
1828 end: extent,
1829 }),
1830 Value::Num(n) => {
1831 let idx = (*n as isize) - 1;
1832 if idx < 0 || idx as usize >= extent {
1833 return Err(data_error("INVALID_SLICE: index out of bounds"));
1834 }
1835 Ok(DimRange {
1836 start: idx as usize,
1837 end: idx as usize + 1,
1838 })
1839 }
1840 Value::Int(i) => {
1841 let idx = i.to_i64() - 1;
1842 if idx < 0 || idx as usize >= extent {
1843 return Err(data_error("INVALID_SLICE: index out of bounds"));
1844 }
1845 Ok(DimRange {
1846 start: idx as usize,
1847 end: idx as usize + 1,
1848 })
1849 }
1850 Value::Tensor(t) if t.data.len() == 2 => {
1851 let start = (t.data[0] as isize) - 1;
1852 let end_inclusive = (t.data[1] as isize) - 1;
1853 if start < 0 || end_inclusive < start || end_inclusive as usize >= extent {
1854 return Err(data_error("INVALID_SLICE: range out of bounds"));
1855 }
1856 Ok(DimRange {
1857 start: start as usize,
1858 end: end_inclusive as usize + 1,
1859 })
1860 }
1861 _ => Err(data_error(
1862 "INVALID_SLICE: dimension must be ':', scalar index, or [start end] range",
1863 )),
1864 }
1865}
1866
1867fn linear_index_column_major(index: &[usize], shape: &[usize]) -> BuiltinResult<usize> {
1868 if index.len() != shape.len() {
1869 return Err(data_error("INVALID_SLICE: rank mismatch"));
1870 }
1871 let mut stride = 1usize;
1872 let mut linear = 0usize;
1873 for (idx, extent) in index.iter().zip(shape.iter()) {
1874 if *idx >= *extent {
1875 return Err(data_error("INVALID_SLICE: index out of bounds"));
1876 }
1877 linear += idx * stride;
1878 stride = stride.saturating_mul(*extent);
1879 }
1880 Ok(linear)
1881}
1882
1883fn advance_index(index: &mut [usize], shape: &[usize]) -> bool {
1884 if shape.is_empty() {
1885 return false;
1886 }
1887 for dim in 0..shape.len() {
1888 index[dim] += 1;
1889 if index[dim] < shape[dim] {
1890 return true;
1891 }
1892 index[dim] = 0;
1893 }
1894 false
1895}
1896
1897fn chunk_key(coords: &[usize]) -> String {
1898 coords
1899 .iter()
1900 .map(|v| v.to_string())
1901 .collect::<Vec<_>>()
1902 .join(".")
1903}
1904
1905fn chunk_start_for_coords(coords: &[usize], chunk_shape: &[usize]) -> Vec<usize> {
1906 coords
1907 .iter()
1908 .enumerate()
1909 .map(|(dim, coord)| coord * chunk_shape.get(dim).copied().unwrap_or(1).max(1))
1910 .collect()
1911}
1912
1913fn chunk_extent_for_start(start: &[usize], chunk_shape: &[usize], shape: &[usize]) -> Vec<usize> {
1914 start
1915 .iter()
1916 .enumerate()
1917 .map(|(dim, start)| {
1918 let chunk = chunk_shape.get(dim).copied().unwrap_or(1).max(1);
1919 let end = (*start + chunk).min(shape[dim]);
1920 end.saturating_sub(*start)
1921 })
1922 .collect()
1923}
1924
1925fn chunk_intersection(
1926 ranges: &[DimRange],
1927 chunk_start: &[usize],
1928 chunk_extent: &[usize],
1929) -> Vec<DimRange> {
1930 let mut out = Vec::with_capacity(ranges.len());
1931 for dim in 0..ranges.len() {
1932 let c_start = chunk_start[dim];
1933 let c_end = c_start + chunk_extent[dim];
1934 let start = ranges[dim].start.max(c_start);
1935 let end = ranges[dim].end.min(c_end);
1936 if start >= end {
1937 return Vec::new();
1938 }
1939 out.push(DimRange { start, end });
1940 }
1941 out
1942}
1943
1944fn touched_chunk_coords(
1945 ranges: &[DimRange],
1946 chunk_shape: &[usize],
1947 shape: &[usize],
1948) -> Vec<Vec<usize>> {
1949 let mut span = Vec::with_capacity(ranges.len());
1950 let mut begin = Vec::with_capacity(ranges.len());
1951 for dim in 0..ranges.len() {
1952 if shape[dim] == 0 {
1953 return Vec::new();
1954 }
1955 let chunk = chunk_shape.get(dim).copied().unwrap_or(1).max(1);
1956 let first = ranges[dim].start / chunk;
1957 let last = (ranges[dim].end.saturating_sub(1)) / chunk;
1958 begin.push(first);
1959 span.push(last.saturating_sub(first) + 1);
1960 }
1961 let mut local = vec![0usize; span.len()];
1962 let mut out = Vec::new();
1963 loop {
1964 out.push(
1965 local
1966 .iter()
1967 .enumerate()
1968 .map(|(dim, v)| begin[dim] + *v)
1969 .collect::<Vec<_>>(),
1970 );
1971 if !advance_index(&mut local, &span) {
1972 break;
1973 }
1974 }
1975 out
1976}
1977
1978async fn maybe_upload_chunk_batch_async(
1979 root: &std::path::Path,
1980 array_name: &str,
1981 batch: Vec<(DataChunkDescriptor, Vec<u8>)>,
1982) -> BuiltinResult<()> {
1983 if batch.is_empty() {
1984 return Ok(());
1985 }
1986 let request = DataChunkUploadRequest {
1987 dataset_path: root.to_string_lossy().to_string(),
1988 array: array_name.to_string(),
1989 chunks: batch.iter().map(|(d, _)| d.clone()).collect(),
1990 };
1991 let targets = match runmat_filesystem::data_chunk_upload_targets_async(&request).await {
1992 Ok(targets) => targets,
1993 Err(err) if err.kind() == std::io::ErrorKind::Unsupported => return Ok(()),
1994 Err(err) => {
1995 return Err(data_error(format!(
1996 "failed to request data chunk upload targets: {err}"
1997 )))
1998 }
1999 };
2000 for (descriptor, bytes) in batch {
2001 let target = targets
2002 .iter()
2003 .find(|t| t.key == descriptor.key)
2004 .ok_or_else(|| {
2005 data_error(format!(
2006 "missing upload target for chunk '{}'",
2007 descriptor.key
2008 ))
2009 })?;
2010 runmat_filesystem::data_upload_chunk_async(target, &bytes)
2011 .await
2012 .map_err(|err| {
2013 data_error(format!(
2014 "failed to upload chunk '{}': {err}",
2015 descriptor.key
2016 ))
2017 })?;
2018 tracing::info!(
2019 target: "runmat.data",
2020 dataset = %root.display(),
2021 array = array_name,
2022 chunk_key = descriptor.key,
2023 bytes = bytes.len(),
2024 "chunk upload completed"
2025 );
2026 }
2027 Ok(())
2028}
2029
2030fn chunk_rel_path(array_name: &str, object_id: &str) -> String {
2031 format!("arrays/{array_name}/chunks/{object_id}.json")
2032}
2033
2034async fn load_or_init_chunk(
2035 root: &std::path::Path,
2036 array_name: &str,
2037 key: &str,
2038 coords: &[usize],
2039 chunk_extent: &[usize],
2040 pos_by_key: &HashMap<String, usize>,
2041 chunk_index: &DataChunkIndex,
2042) -> BuiltinResult<(usize, bool, DataChunkIndexEntry, DataArrayPayload)> {
2043 if let Some(index) = pos_by_key.get(key).copied() {
2044 let entry = chunk_index
2045 .chunks
2046 .get(index)
2047 .cloned()
2048 .ok_or_else(|| data_error(format!("chunk index missing key '{key}'")))?;
2049 let bytes = runmat_filesystem::read_async(root.join(&entry.data_path))
2050 .await
2051 .map_err(|err| {
2052 data_error(format!(
2053 "failed to read chunk payload '{}': {err}",
2054 entry.data_path
2055 ))
2056 })?;
2057 let payload: DataArrayPayload = serde_json::from_slice(&bytes).map_err(|err| {
2058 data_error(format!(
2059 "failed to parse chunk payload '{}': {err}",
2060 entry.data_path
2061 ))
2062 })?;
2063 return Ok((index, true, entry, payload));
2064 }
2065
2066 let object_id = format!("obj_{}", key.replace('.', "_"));
2067 let entry = DataChunkIndexEntry {
2068 key: key.to_string(),
2069 object_id: object_id.clone(),
2070 hash: String::new(),
2071 bytes_raw: 0,
2072 bytes_stored: 0,
2073 coords: coords.to_vec(),
2074 shape: chunk_extent.to_vec(),
2075 data_path: chunk_rel_path(array_name, &object_id),
2076 };
2077 let payload = DataArrayPayload {
2078 dtype: "f64".to_string(),
2079 shape: chunk_extent.to_vec(),
2080 values: vec![0.0; chunk_extent.iter().copied().product()],
2081 };
2082 Ok((chunk_index.chunks.len(), false, entry, payload))
2083}
2084
2085fn attrs_to_struct(attrs: &BTreeMap<String, serde_json::Value>) -> Value {
2086 let mut out = StructValue::new();
2087 for (k, v) in attrs {
2088 out.fields.insert(k.clone(), json_to_value(v));
2089 }
2090 Value::Struct(out)
2091}
2092
2093fn value_to_json(value: &Value) -> serde_json::Value {
2094 match value {
2095 Value::String(s) => serde_json::Value::String(s.clone()),
2096 Value::CharArray(ca) => serde_json::Value::String(ca.to_string()),
2097 Value::Num(n) => serde_json::json!(n),
2098 Value::Int(i) => serde_json::json!(i.to_i64()),
2099 Value::Bool(b) => serde_json::json!(b),
2100 _ => serde_json::Value::String(format!("{value:?}")),
2101 }
2102}
2103
2104fn json_to_value(value: &serde_json::Value) -> Value {
2105 match value {
2106 serde_json::Value::Bool(b) => Value::Bool(*b),
2107 serde_json::Value::Number(n) => Value::Num(n.as_f64().unwrap_or_default()),
2108 serde_json::Value::String(s) => Value::String(s.clone()),
2109 serde_json::Value::Array(arr) => {
2110 let vals = arr.iter().map(json_to_value).collect::<Vec<_>>();
2111 crate::make_cell(vals.clone(), 1, vals.len())
2112 .unwrap_or_else(|_| Value::String("<invalid-array>".to_string()))
2113 }
2114 serde_json::Value::Object(map) => {
2115 let mut s = StructValue::new();
2116 for (k, v) in map {
2117 s.fields.insert(k.clone(), json_to_value(v));
2118 }
2119 Value::Struct(s)
2120 }
2121 serde_json::Value::Null => Value::String("".to_string()),
2122 }
2123}
2124
2125#[cfg(test)]
2126mod tests {
2127 use super::*;
2128 use crate::dispatcher::call_builtin;
2129 use async_trait::async_trait;
2130 use axum::extract::{Query, State};
2131 use axum::http::{HeaderMap, StatusCode};
2132 use axum::routing::{post, put};
2133 use axum::{Json, Router};
2134 use runmat_builtins::CellArray;
2135 use runmat_filesystem::data_contract::{
2136 DataChunkUploadRequest, DataChunkUploadTarget, DataManifestDescriptor, DataManifestRequest,
2137 };
2138 use runmat_filesystem::{
2139 DirEntry, FileHandle, FsMetadata, FsProvider, NativeFsProvider, OpenFlags,
2140 };
2141 use serde::Deserialize;
2142 use std::path::Path;
2143 use std::sync::{Arc, Mutex, MutexGuard, OnceLock};
2144 use tokio::runtime::Runtime;
2145 use tokio::sync::oneshot;
2146
2147 fn serial_test_guard() -> MutexGuard<'static, ()> {
2148 static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
2149 LOCK.get_or_init(|| Mutex::new(()))
2150 .lock()
2151 .expect("data test serial lock poisoned")
2152 }
2153
2154 #[derive(Default)]
2155 struct CountingDataUploadProvider {
2156 inner: NativeFsProvider,
2157 uploaded_keys: Arc<Mutex<Vec<String>>>,
2158 }
2159
2160 struct HttpDataUploadProvider {
2161 inner: NativeFsProvider,
2162 base_url: String,
2163 client: reqwest::blocking::Client,
2164 }
2165
2166 impl HttpDataUploadProvider {
2167 fn new(base_url: String) -> Self {
2168 Self {
2169 inner: NativeFsProvider,
2170 base_url,
2171 client: reqwest::blocking::Client::new(),
2172 }
2173 }
2174 }
2175
2176 #[async_trait(?Send)]
2177 impl FsProvider for HttpDataUploadProvider {
2178 fn open(&self, path: &Path, flags: &OpenFlags) -> std::io::Result<Box<dyn FileHandle>> {
2179 self.inner.open(path, flags)
2180 }
2181
2182 async fn read(&self, path: &Path) -> std::io::Result<Vec<u8>> {
2183 self.inner.read(path).await
2184 }
2185
2186 async fn write(&self, path: &Path, data: &[u8]) -> std::io::Result<()> {
2187 self.inner.write(path, data).await
2188 }
2189
2190 async fn remove_file(&self, path: &Path) -> std::io::Result<()> {
2191 self.inner.remove_file(path).await
2192 }
2193
2194 async fn metadata(&self, path: &Path) -> std::io::Result<FsMetadata> {
2195 self.inner.metadata(path).await
2196 }
2197
2198 async fn symlink_metadata(&self, path: &Path) -> std::io::Result<FsMetadata> {
2199 self.inner.symlink_metadata(path).await
2200 }
2201
2202 async fn read_dir(&self, path: &Path) -> std::io::Result<Vec<DirEntry>> {
2203 self.inner.read_dir(path).await
2204 }
2205
2206 async fn canonicalize(&self, path: &Path) -> std::io::Result<std::path::PathBuf> {
2207 self.inner.canonicalize(path).await
2208 }
2209
2210 async fn create_dir(&self, path: &Path) -> std::io::Result<()> {
2211 self.inner.create_dir(path).await
2212 }
2213
2214 async fn create_dir_all(&self, path: &Path) -> std::io::Result<()> {
2215 self.inner.create_dir_all(path).await
2216 }
2217
2218 async fn remove_dir(&self, path: &Path) -> std::io::Result<()> {
2219 self.inner.remove_dir(path).await
2220 }
2221
2222 async fn remove_dir_all(&self, path: &Path) -> std::io::Result<()> {
2223 self.inner.remove_dir_all(path).await
2224 }
2225
2226 async fn rename(&self, from: &Path, to: &Path) -> std::io::Result<()> {
2227 self.inner.rename(from, to).await
2228 }
2229
2230 async fn set_readonly(&self, path: &Path, readonly: bool) -> std::io::Result<()> {
2231 self.inner.set_readonly(path, readonly).await
2232 }
2233
2234 async fn data_manifest_descriptor(
2235 &self,
2236 request: &DataManifestRequest,
2237 ) -> std::io::Result<DataManifestDescriptor> {
2238 self.inner.data_manifest_descriptor(request).await
2239 }
2240
2241 async fn data_chunk_upload_targets(
2242 &self,
2243 request: &DataChunkUploadRequest,
2244 ) -> std::io::Result<Vec<DataChunkUploadTarget>> {
2245 #[derive(Deserialize)]
2246 struct UploadTargetsResponse {
2247 targets: Vec<DataChunkUploadTarget>,
2248 }
2249 let url = format!("{}/data/chunks/upload-targets", self.base_url);
2250 let response = self
2251 .client
2252 .post(url)
2253 .json(request)
2254 .send()
2255 .map_err(|err| std::io::Error::other(err.to_string()))?;
2256 if !response.status().is_success() {
2257 return Err(std::io::Error::other(format!(
2258 "upload targets request failed: {}",
2259 response.status()
2260 )));
2261 }
2262 let parsed: UploadTargetsResponse = response
2263 .json()
2264 .map_err(|err| std::io::Error::other(err.to_string()))?;
2265 Ok(parsed.targets)
2266 }
2267
2268 async fn data_upload_chunk(
2269 &self,
2270 target: &DataChunkUploadTarget,
2271 data: &[u8],
2272 ) -> std::io::Result<()> {
2273 let upload_url = if let Some(key) = target.upload_url.strip_prefix("upload://") {
2274 format!("{}/upload?key={}", self.base_url, key)
2275 } else {
2276 target.upload_url.clone()
2277 };
2278 let method = reqwest::Method::from_bytes(target.method.as_bytes())
2279 .map_err(|err| std::io::Error::other(err.to_string()))?;
2280 let mut request = self.client.request(method, &upload_url);
2281 for (k, v) in &target.headers {
2282 request = request.header(k, v);
2283 }
2284 let response = request
2285 .body(data.to_vec())
2286 .send()
2287 .map_err(|err| std::io::Error::other(err.to_string()))?;
2288 if !response.status().is_success() {
2289 return Err(std::io::Error::other(format!(
2290 "chunk upload failed: {}",
2291 response.status()
2292 )));
2293 }
2294 Ok(())
2295 }
2296 }
2297
2298 #[derive(Clone, Default)]
2299 struct UploadHarness {
2300 uploads: Arc<Mutex<Vec<String>>>,
2301 }
2302
2303 #[derive(Deserialize)]
2304 struct UploadChunkQuery {
2305 key: String,
2306 }
2307
2308 async fn upload_targets_handler(
2309 Json(req): Json<DataChunkUploadRequest>,
2310 ) -> Result<Json<serde_json::Value>, StatusCode> {
2311 let targets = req
2312 .chunks
2313 .iter()
2314 .map(|chunk| {
2315 serde_json::json!({
2316 "key": chunk.key,
2317 "method": "PUT",
2318 "upload_url": format!("upload://{}", chunk.key),
2319 "headers": {
2320 "x-runmat-hash": chunk.hash,
2321 }
2322 })
2323 })
2324 .collect::<Vec<_>>();
2325 Ok(Json(serde_json::json!({ "targets": targets })))
2326 }
2327
2328 async fn upload_handler(
2329 State(harness): State<UploadHarness>,
2330 Query(query): Query<UploadChunkQuery>,
2331 headers: HeaderMap,
2332 body: axum::body::Bytes,
2333 ) -> Result<(), StatusCode> {
2334 if body.is_empty() {
2335 return Err(StatusCode::BAD_REQUEST);
2336 }
2337 if headers.get("x-runmat-hash").is_none() {
2338 return Err(StatusCode::BAD_REQUEST);
2339 }
2340 let mut guard = harness.uploads.lock().expect("uploads lock poisoned");
2341 guard.push(query.key);
2342 Ok(())
2343 }
2344
2345 fn spawn_upload_server() -> (
2346 String,
2347 Arc<Mutex<Vec<String>>>,
2348 Runtime,
2349 oneshot::Sender<()>,
2350 ) {
2351 let harness = UploadHarness::default();
2352 let uploads = Arc::clone(&harness.uploads);
2353 let runtime = Runtime::new().expect("tokio runtime");
2354 let (addr, shutdown_tx) = runtime.block_on(async move {
2355 let listener = tokio::net::TcpListener::bind((std::net::Ipv4Addr::LOCALHOST, 0))
2356 .await
2357 .expect("bind upload server");
2358 let addr = listener.local_addr().expect("local addr");
2359 let app = Router::new()
2360 .route("/data/chunks/upload-targets", post(upload_targets_handler))
2361 .route("/upload", put(upload_handler))
2362 .with_state(harness);
2363 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
2364 let server = axum::serve(listener, app).with_graceful_shutdown(async {
2365 let _ = shutdown_rx.await;
2366 });
2367 tokio::spawn(async move {
2368 let _ = server.await;
2369 });
2370 (addr, shutdown_tx)
2371 });
2372 (format!("http://{}", addr), uploads, runtime, shutdown_tx)
2373 }
2374
2375 impl CountingDataUploadProvider {
2376 fn uploaded_keys(&self) -> Arc<Mutex<Vec<String>>> {
2377 Arc::clone(&self.uploaded_keys)
2378 }
2379 }
2380
2381 #[async_trait(?Send)]
2382 impl FsProvider for CountingDataUploadProvider {
2383 fn open(&self, path: &Path, flags: &OpenFlags) -> std::io::Result<Box<dyn FileHandle>> {
2384 self.inner.open(path, flags)
2385 }
2386
2387 async fn read(&self, path: &Path) -> std::io::Result<Vec<u8>> {
2388 self.inner.read(path).await
2389 }
2390
2391 async fn write(&self, path: &Path, data: &[u8]) -> std::io::Result<()> {
2392 self.inner.write(path, data).await
2393 }
2394
2395 async fn remove_file(&self, path: &Path) -> std::io::Result<()> {
2396 self.inner.remove_file(path).await
2397 }
2398
2399 async fn metadata(&self, path: &Path) -> std::io::Result<FsMetadata> {
2400 self.inner.metadata(path).await
2401 }
2402
2403 async fn symlink_metadata(&self, path: &Path) -> std::io::Result<FsMetadata> {
2404 self.inner.symlink_metadata(path).await
2405 }
2406
2407 async fn read_dir(&self, path: &Path) -> std::io::Result<Vec<DirEntry>> {
2408 self.inner.read_dir(path).await
2409 }
2410
2411 async fn canonicalize(&self, path: &Path) -> std::io::Result<std::path::PathBuf> {
2412 self.inner.canonicalize(path).await
2413 }
2414
2415 async fn create_dir(&self, path: &Path) -> std::io::Result<()> {
2416 self.inner.create_dir(path).await
2417 }
2418
2419 async fn create_dir_all(&self, path: &Path) -> std::io::Result<()> {
2420 self.inner.create_dir_all(path).await
2421 }
2422
2423 async fn remove_dir(&self, path: &Path) -> std::io::Result<()> {
2424 self.inner.remove_dir(path).await
2425 }
2426
2427 async fn remove_dir_all(&self, path: &Path) -> std::io::Result<()> {
2428 self.inner.remove_dir_all(path).await
2429 }
2430
2431 async fn rename(&self, from: &Path, to: &Path) -> std::io::Result<()> {
2432 self.inner.rename(from, to).await
2433 }
2434
2435 async fn set_readonly(&self, path: &Path, readonly: bool) -> std::io::Result<()> {
2436 self.inner.set_readonly(path, readonly).await
2437 }
2438
2439 async fn data_manifest_descriptor(
2440 &self,
2441 request: &DataManifestRequest,
2442 ) -> std::io::Result<DataManifestDescriptor> {
2443 self.inner.data_manifest_descriptor(request).await
2444 }
2445
2446 async fn data_chunk_upload_targets(
2447 &self,
2448 request: &DataChunkUploadRequest,
2449 ) -> std::io::Result<Vec<DataChunkUploadTarget>> {
2450 Ok(request
2451 .chunks
2452 .iter()
2453 .map(|chunk| DataChunkUploadTarget {
2454 key: chunk.key.clone(),
2455 method: "PUT".to_string(),
2456 upload_url: format!("count://{}", chunk.object_id),
2457 headers: std::collections::HashMap::new(),
2458 })
2459 .collect())
2460 }
2461
2462 async fn data_upload_chunk(
2463 &self,
2464 target: &DataChunkUploadTarget,
2465 _data: &[u8],
2466 ) -> std::io::Result<()> {
2467 let mut guard = match self.uploaded_keys.lock() {
2468 Ok(guard) => guard,
2469 Err(poisoned) => poisoned.into_inner(),
2470 };
2471 guard.push(target.key.clone());
2472 Ok(())
2473 }
2474 }
2475
2476 #[test]
2477 fn create_open_write_read_dataset() {
2478 let _serial = serial_test_guard();
2479 let dir = tempfile::tempdir().expect("tempdir");
2480 let path = dir.path().join("sample.data").to_string_lossy().to_string();
2481
2482 let mut array_meta = StructValue::new();
2483 array_meta
2484 .fields
2485 .insert("dtype".to_string(), Value::String("f64".to_string()));
2486 array_meta.fields.insert(
2487 "shape".to_string(),
2488 Value::Tensor(Tensor::new(vec![2.0, 2.0], vec![1, 2]).expect("shape tensor")),
2489 );
2490 let mut arrays = StructValue::new();
2491 arrays
2492 .fields
2493 .insert("temperature".to_string(), Value::Struct(array_meta));
2494 let mut schema = StructValue::new();
2495 schema
2496 .fields
2497 .insert("arrays".to_string(), Value::Struct(arrays));
2498
2499 let ds = call_builtin(
2500 "data.create",
2501 &[
2502 Value::String(path.clone()),
2503 Value::Struct(schema),
2504 Value::Cell(runmat_builtins::CellArray::new(vec![], 1, 0).expect("cell")),
2505 ],
2506 )
2507 .expect("create dataset");
2508
2509 let arr = call_builtin(
2510 "Dataset.array",
2511 &[ds, Value::String("temperature".to_string())],
2512 )
2513 .expect("dataset array");
2514 let write_tensor = Tensor::new(vec![1.0, 2.0, 3.0, 4.0], vec![2, 2]).expect("write tensor");
2515 call_builtin(
2516 "DataArray.write",
2517 &[arr.clone(), Value::Tensor(write_tensor)],
2518 )
2519 .expect("write array");
2520
2521 let read_back = call_builtin("DataArray.read", &[arr]).expect("read array");
2522 let Value::Tensor(t) = read_back else {
2523 panic!("expected tensor");
2524 };
2525 assert_eq!(t.shape, vec![2, 2]);
2526 assert_eq!(t.data, vec![1.0, 2.0, 3.0, 4.0]);
2527 }
2528
2529 #[test]
2530 fn write_and_read_slice_payload() {
2531 let _serial = serial_test_guard();
2532 let dir = tempfile::tempdir().expect("tempdir");
2533 let path = dir.path().join("slice.data").to_string_lossy().to_string();
2534
2535 let mut array_meta = StructValue::new();
2536 array_meta
2537 .fields
2538 .insert("dtype".to_string(), Value::String("f64".to_string()));
2539 array_meta.fields.insert(
2540 "shape".to_string(),
2541 Value::Tensor(Tensor::new(vec![3.0, 3.0], vec![1, 2]).expect("shape tensor")),
2542 );
2543 let mut arrays = StructValue::new();
2544 arrays
2545 .fields
2546 .insert("temperature".to_string(), Value::Struct(array_meta));
2547 let mut schema = StructValue::new();
2548 schema
2549 .fields
2550 .insert("arrays".to_string(), Value::Struct(arrays));
2551
2552 let ds = call_builtin(
2553 "data.create",
2554 &[
2555 Value::String(path.clone()),
2556 Value::Struct(schema),
2557 Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
2558 ],
2559 )
2560 .expect("create dataset");
2561 let arr = call_builtin(
2562 "Dataset.array",
2563 &[ds, Value::String("temperature".to_string())],
2564 )
2565 .expect("dataset array");
2566
2567 let slice = Value::Cell(
2568 CellArray::new(
2569 vec![
2570 Value::Tensor(Tensor::new(vec![1.0, 2.0], vec![1, 2]).expect("range")),
2571 Value::String(":".to_string()),
2572 ],
2573 1,
2574 2,
2575 )
2576 .expect("slice cell"),
2577 );
2578 let rhs = Value::Tensor(
2579 Tensor::new(vec![10.0, 11.0, 12.0, 13.0, 14.0, 15.0], vec![2, 3]).expect("rhs"),
2580 );
2581 call_builtin("DataArray.write", &[arr.clone(), slice.clone(), rhs]).expect("slice write");
2582
2583 let read_back = call_builtin("DataArray.read", &[arr.clone(), slice]).expect("slice read");
2584 let Value::Tensor(t) = read_back else {
2585 panic!("expected tensor");
2586 };
2587 assert_eq!(t.shape, vec![2, 3]);
2588 assert_eq!(t.data, vec![10.0, 11.0, 12.0, 13.0, 14.0, 15.0]);
2589 }
2590
2591 #[test]
2592 fn slice_write_updates_only_touched_chunks() {
2593 let _serial = serial_test_guard();
2594 let dir = tempfile::tempdir().expect("tempdir");
2595 let path = dir
2596 .path()
2597 .join("chunked.data")
2598 .to_string_lossy()
2599 .to_string();
2600
2601 let mut array_meta = StructValue::new();
2602 array_meta
2603 .fields
2604 .insert("dtype".to_string(), Value::String("f64".to_string()));
2605 array_meta.fields.insert(
2606 "shape".to_string(),
2607 Value::Tensor(Tensor::new(vec![4.0, 4.0], vec![1, 2]).expect("shape tensor")),
2608 );
2609 array_meta.fields.insert(
2610 "chunk".to_string(),
2611 Value::Tensor(Tensor::new(vec![2.0, 2.0], vec![1, 2]).expect("chunk tensor")),
2612 );
2613 let mut arrays = StructValue::new();
2614 arrays
2615 .fields
2616 .insert("temperature".to_string(), Value::Struct(array_meta));
2617 let mut schema = StructValue::new();
2618 schema
2619 .fields
2620 .insert("arrays".to_string(), Value::Struct(arrays));
2621
2622 let ds = call_builtin(
2623 "data.create",
2624 &[
2625 Value::String(path.clone()),
2626 Value::Struct(schema),
2627 Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
2628 ],
2629 )
2630 .expect("create dataset");
2631 let arr = call_builtin(
2632 "Dataset.array",
2633 &[ds, Value::String("temperature".to_string())],
2634 )
2635 .expect("dataset array");
2636
2637 let full = Value::Tensor(
2638 Tensor::new((1..=16).map(|v| v as f64).collect(), vec![4, 4]).expect("full tensor"),
2639 );
2640 call_builtin("DataArray.write", &[arr.clone(), full]).expect("initial write");
2641
2642 let root = std::path::PathBuf::from(&path);
2643 let untouched_path = root.join("arrays/temperature/chunks/obj_1_1.json");
2644 let touched_path = root.join("arrays/temperature/chunks/obj_0_0.json");
2645 let untouched_before =
2646 futures::executor::block_on(runmat_filesystem::read_async(&untouched_path))
2647 .expect("read untouched before");
2648 let touched_before =
2649 futures::executor::block_on(runmat_filesystem::read_async(&touched_path))
2650 .expect("read touched before");
2651
2652 let slice = Value::Cell(
2653 CellArray::new(
2654 vec![
2655 Value::Tensor(Tensor::new(vec![1.0, 2.0], vec![1, 2]).expect("range")),
2656 Value::Tensor(Tensor::new(vec![1.0, 2.0], vec![1, 2]).expect("range")),
2657 ],
2658 1,
2659 2,
2660 )
2661 .expect("slice cell"),
2662 );
2663 let rhs =
2664 Value::Tensor(Tensor::new(vec![99.0, 98.0, 97.0, 96.0], vec![2, 2]).expect("rhs"));
2665 call_builtin("DataArray.write", &[arr.clone(), slice, rhs]).expect("slice write");
2666
2667 let untouched_after =
2668 futures::executor::block_on(runmat_filesystem::read_async(&untouched_path))
2669 .expect("read untouched after");
2670 let touched_after =
2671 futures::executor::block_on(runmat_filesystem::read_async(&touched_path))
2672 .expect("read touched after");
2673 assert_eq!(untouched_before, untouched_after);
2674 assert_ne!(touched_before, touched_after);
2675 }
2676
2677 #[test]
2678 fn slice_write_uploads_only_touched_chunk_targets() {
2679 let _serial = serial_test_guard();
2680 let provider = Arc::new(CountingDataUploadProvider::default());
2681 let uploaded = provider.uploaded_keys();
2682 let _guard = runmat_filesystem::replace_provider(provider);
2683
2684 let dir = tempfile::tempdir().expect("tempdir");
2685 let path = dir
2686 .path()
2687 .join("remote-chunked.data")
2688 .to_string_lossy()
2689 .to_string();
2690
2691 let mut array_meta = StructValue::new();
2692 array_meta
2693 .fields
2694 .insert("dtype".to_string(), Value::String("f64".to_string()));
2695 array_meta.fields.insert(
2696 "shape".to_string(),
2697 Value::Tensor(Tensor::new(vec![4.0, 4.0], vec![1, 2]).expect("shape tensor")),
2698 );
2699 array_meta.fields.insert(
2700 "chunk".to_string(),
2701 Value::Tensor(Tensor::new(vec![2.0, 2.0], vec![1, 2]).expect("chunk tensor")),
2702 );
2703 let mut arrays = StructValue::new();
2704 arrays
2705 .fields
2706 .insert("temperature".to_string(), Value::Struct(array_meta));
2707 let mut schema = StructValue::new();
2708 schema
2709 .fields
2710 .insert("arrays".to_string(), Value::Struct(arrays));
2711
2712 let ds = call_builtin(
2713 "data.create",
2714 &[
2715 Value::String(path.clone()),
2716 Value::Struct(schema),
2717 Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
2718 ],
2719 )
2720 .expect("create dataset");
2721 let arr = call_builtin(
2722 "Dataset.array",
2723 &[ds, Value::String("temperature".to_string())],
2724 )
2725 .expect("dataset array");
2726
2727 call_builtin(
2728 "DataArray.write",
2729 &[
2730 arr.clone(),
2731 Value::Tensor(
2732 Tensor::new((1..=16).map(|v| v as f64).collect(), vec![4, 4])
2733 .expect("full tensor"),
2734 ),
2735 ],
2736 )
2737 .expect("initial write");
2738
2739 let manifest =
2740 futures::executor::block_on(crate::data::read_manifest_async(&dataset_root(&path)))
2741 .expect("manifest after initial write");
2742 let meta = manifest
2743 .arrays
2744 .get("temperature")
2745 .expect("temperature meta");
2746 let chunk_index_path =
2747 dataset_root(&path).join(meta.chunk_index_path.clone().expect("chunk index path"));
2748 assert!(
2749 futures::executor::block_on(runmat_filesystem::metadata_async(&chunk_index_path))
2750 .is_ok()
2751 );
2752
2753 {
2754 let mut keys = uploaded.lock().expect("uploaded keys lock");
2755 keys.clear();
2756 }
2757
2758 let slice = Value::Cell(
2759 CellArray::new(
2760 vec![
2761 Value::Tensor(Tensor::new(vec![1.0, 2.0], vec![1, 2]).expect("range")),
2762 Value::Tensor(Tensor::new(vec![1.0, 2.0], vec![1, 2]).expect("range")),
2763 ],
2764 1,
2765 2,
2766 )
2767 .expect("slice cell"),
2768 );
2769 let rhs = Value::Tensor(Tensor::new(vec![9.0, 8.0, 7.0, 6.0], vec![2, 2]).expect("rhs"));
2770 call_builtin("DataArray.write", &[arr, slice, rhs]).expect("slice write");
2771
2772 let keys = uploaded.lock().expect("uploaded keys lock");
2773 assert_eq!(keys.as_slice(), ["0.0".to_string()].as_slice());
2774 }
2775
2776 #[test]
2777 fn slice_write_uploads_expected_cross_boundary_chunk_targets() {
2778 let _serial = serial_test_guard();
2779 let provider = Arc::new(CountingDataUploadProvider::default());
2780 let uploaded = provider.uploaded_keys();
2781 let _guard = runmat_filesystem::replace_provider(provider);
2782
2783 let dir = tempfile::tempdir().expect("tempdir");
2784 let path = dir
2785 .path()
2786 .join("remote-chunked-boundary.data")
2787 .to_string_lossy()
2788 .to_string();
2789
2790 let mut array_meta = StructValue::new();
2791 array_meta
2792 .fields
2793 .insert("dtype".to_string(), Value::String("f64".to_string()));
2794 array_meta.fields.insert(
2795 "shape".to_string(),
2796 Value::Tensor(Tensor::new(vec![4.0, 4.0], vec![1, 2]).expect("shape tensor")),
2797 );
2798 array_meta.fields.insert(
2799 "chunk".to_string(),
2800 Value::Tensor(Tensor::new(vec![2.0, 2.0], vec![1, 2]).expect("chunk tensor")),
2801 );
2802 let mut arrays = StructValue::new();
2803 arrays
2804 .fields
2805 .insert("temperature".to_string(), Value::Struct(array_meta));
2806 let mut schema = StructValue::new();
2807 schema
2808 .fields
2809 .insert("arrays".to_string(), Value::Struct(arrays));
2810
2811 let ds = call_builtin(
2812 "data.create",
2813 &[
2814 Value::String(path.clone()),
2815 Value::Struct(schema),
2816 Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
2817 ],
2818 )
2819 .expect("create dataset");
2820 let arr = call_builtin(
2821 "Dataset.array",
2822 &[ds, Value::String("temperature".to_string())],
2823 )
2824 .expect("dataset array");
2825
2826 call_builtin(
2827 "DataArray.write",
2828 &[
2829 arr.clone(),
2830 Value::Tensor(
2831 Tensor::new((1..=16).map(|v| v as f64).collect(), vec![4, 4])
2832 .expect("full tensor"),
2833 ),
2834 ],
2835 )
2836 .expect("initial write");
2837 {
2838 let mut keys = uploaded.lock().expect("uploaded keys lock");
2839 keys.clear();
2840 }
2841
2842 let slice = Value::Cell(
2843 CellArray::new(
2844 vec![
2845 Value::Tensor(Tensor::new(vec![2.0, 3.0], vec![1, 2]).expect("range")),
2846 Value::Tensor(Tensor::new(vec![2.0, 3.0], vec![1, 2]).expect("range")),
2847 ],
2848 1,
2849 2,
2850 )
2851 .expect("slice cell"),
2852 );
2853 let rhs =
2854 Value::Tensor(Tensor::new(vec![19.0, 18.0, 17.0, 16.0], vec![2, 2]).expect("rhs"));
2855 call_builtin("DataArray.write", &[arr, slice, rhs]).expect("slice write");
2856
2857 let mut keys = uploaded.lock().expect("uploaded keys lock").clone();
2858 keys.sort();
2859 keys.dedup();
2860 assert_eq!(
2861 keys.as_slice(),
2862 [
2863 "0.0".to_string(),
2864 "0.1".to_string(),
2865 "1.0".to_string(),
2866 "1.1".to_string(),
2867 ]
2868 .as_slice()
2869 );
2870 }
2871
2872 #[test]
2873 fn slice_write_hits_http_server_data_endpoints_with_expected_keys() {
2874 let _serial = serial_test_guard();
2875 let (base_url, uploads, runtime, shutdown_tx) = spawn_upload_server();
2876 let provider = Arc::new(HttpDataUploadProvider::new(base_url));
2877 let _guard = runmat_filesystem::replace_provider(provider);
2878
2879 let dir = tempfile::tempdir().expect("tempdir");
2880 let path = dir
2881 .path()
2882 .join("http-endpoint.data")
2883 .to_string_lossy()
2884 .to_string();
2885
2886 let mut array_meta = StructValue::new();
2887 array_meta
2888 .fields
2889 .insert("dtype".to_string(), Value::String("f64".to_string()));
2890 array_meta.fields.insert(
2891 "shape".to_string(),
2892 Value::Tensor(Tensor::new(vec![4.0, 4.0], vec![1, 2]).expect("shape tensor")),
2893 );
2894 array_meta.fields.insert(
2895 "chunk".to_string(),
2896 Value::Tensor(Tensor::new(vec![2.0, 2.0], vec![1, 2]).expect("chunk tensor")),
2897 );
2898 let mut arrays = StructValue::new();
2899 arrays
2900 .fields
2901 .insert("temperature".to_string(), Value::Struct(array_meta));
2902 let mut schema = StructValue::new();
2903 schema
2904 .fields
2905 .insert("arrays".to_string(), Value::Struct(arrays));
2906
2907 let ds = call_builtin(
2908 "data.create",
2909 &[
2910 Value::String(path.clone()),
2911 Value::Struct(schema),
2912 Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
2913 ],
2914 )
2915 .expect("create dataset");
2916 let arr = call_builtin(
2917 "Dataset.array",
2918 &[ds, Value::String("temperature".to_string())],
2919 )
2920 .expect("dataset array");
2921
2922 call_builtin(
2923 "DataArray.write",
2924 &[
2925 arr.clone(),
2926 Value::Tensor(
2927 Tensor::new((1..=16).map(|v| v as f64).collect(), vec![4, 4])
2928 .expect("full tensor"),
2929 ),
2930 ],
2931 )
2932 .expect("initial write");
2933
2934 {
2935 let mut keys = uploads.lock().expect("uploads lock");
2936 keys.clear();
2937 }
2938
2939 let slice = Value::Cell(
2940 CellArray::new(
2941 vec![
2942 Value::Tensor(Tensor::new(vec![2.0, 3.0], vec![1, 2]).expect("range")),
2943 Value::Tensor(Tensor::new(vec![2.0, 3.0], vec![1, 2]).expect("range")),
2944 ],
2945 1,
2946 2,
2947 )
2948 .expect("slice cell"),
2949 );
2950 let rhs =
2951 Value::Tensor(Tensor::new(vec![19.0, 18.0, 17.0, 16.0], vec![2, 2]).expect("rhs"));
2952 call_builtin("DataArray.write", &[arr, slice, rhs]).expect("slice write");
2953
2954 let mut keys = uploads.lock().expect("uploads lock").clone();
2955 keys.sort();
2956 keys.dedup();
2957 assert_eq!(
2958 keys.as_slice(),
2959 [
2960 "0.0".to_string(),
2961 "0.1".to_string(),
2962 "1.0".to_string(),
2963 "1.1".to_string(),
2964 ]
2965 .as_slice()
2966 );
2967
2968 let _ = shutdown_tx.send(());
2969 drop(runtime);
2970 }
2971
2972 #[test]
2973 fn tx_create_resize_fill_and_delete_array() {
2974 let _serial = serial_test_guard();
2975 let dir = tempfile::tempdir().expect("tempdir");
2976 let path = dir.path().join("tx-ops.data").to_string_lossy().to_string();
2977
2978 let mut arrays = StructValue::new();
2979 let mut array_meta = StructValue::new();
2980 array_meta
2981 .fields
2982 .insert("dtype".to_string(), Value::String("f64".to_string()));
2983 array_meta.fields.insert(
2984 "shape".to_string(),
2985 Value::Tensor(Tensor::new(vec![1.0, 1.0], vec![1, 2]).expect("shape tensor")),
2986 );
2987 arrays
2988 .fields
2989 .insert("base".to_string(), Value::Struct(array_meta));
2990 let mut schema = StructValue::new();
2991 schema
2992 .fields
2993 .insert("arrays".to_string(), Value::Struct(arrays));
2994
2995 let ds = call_builtin(
2996 "data.create",
2997 &[
2998 Value::String(path.clone()),
2999 Value::Struct(schema),
3000 Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
3001 ],
3002 )
3003 .expect("create dataset");
3004
3005 let tx = call_builtin("Dataset.begin", &[ds]).expect("begin tx");
3006 let mut new_meta = StructValue::new();
3007 new_meta
3008 .fields
3009 .insert("dtype".to_string(), Value::String("f64".to_string()));
3010 new_meta.fields.insert(
3011 "shape".to_string(),
3012 Value::Tensor(Tensor::new(vec![2.0, 2.0], vec![1, 2]).expect("shape tensor")),
3013 );
3014 call_builtin(
3015 "DataTransaction.create_array",
3016 &[
3017 tx.clone(),
3018 Value::String("new_array".to_string()),
3019 Value::Struct(new_meta),
3020 ],
3021 )
3022 .expect("create array in tx");
3023 call_builtin(
3024 "DataTransaction.resize",
3025 &[
3026 tx.clone(),
3027 Value::String("new_array".to_string()),
3028 Value::Tensor(Tensor::new(vec![3.0, 1.0], vec![1, 2]).expect("shape tensor")),
3029 ],
3030 )
3031 .expect("resize array in tx");
3032 call_builtin(
3033 "DataTransaction.fill",
3034 &[
3035 tx.clone(),
3036 Value::String("new_array".to_string()),
3037 Value::Num(7.0),
3038 ],
3039 )
3040 .expect("fill array in tx");
3041 call_builtin(
3042 "DataTransaction.delete_array",
3043 &[tx.clone(), Value::String("base".to_string())],
3044 )
3045 .expect("delete array in tx");
3046 call_builtin("DataTransaction.commit", &[tx]).expect("commit tx");
3047
3048 let ds = call_builtin(
3049 "data.open",
3050 &[
3051 Value::String(path),
3052 Value::Cell(CellArray::new(vec![], 1, 0).expect("cell")),
3053 ],
3054 )
3055 .expect("open dataset");
3056 let has_base = call_builtin(
3057 "Dataset.has_array",
3058 &[ds.clone(), Value::String("base".to_string())],
3059 )
3060 .expect("has base");
3061 assert_eq!(has_base, Value::Bool(false));
3062 let arr = call_builtin(
3063 "Dataset.array",
3064 &[ds, Value::String("new_array".to_string())],
3065 )
3066 .expect("new array");
3067 let read_back = call_builtin("DataArray.read", &[arr]).expect("read array");
3068 let Value::Tensor(t) = read_back else {
3069 panic!("expected tensor");
3070 };
3071 assert_eq!(t.shape, vec![3, 1]);
3072 assert_eq!(t.data, vec![7.0, 7.0, 7.0]);
3073 }
3074}