1use std::collections::{BTreeMap, HashMap};
2use std::path::{Path, PathBuf};
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Mutex, OnceLock};
5
6use chrono::Utc;
7use runmat_builtins::{ObjectInstance, Tensor, Value};
8use runmat_filesystem as fs;
9use runmat_filesystem::data_contract::{
10 DataChunkDescriptor, DataChunkUploadRequest, DataChunkUploadTarget,
11};
12use serde::{Deserialize, Serialize};
13use sha2::{Digest, Sha256};
14
15use crate::{build_runtime_error, BuiltinResult, RuntimeError};
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct DataManifest {
19 pub schema_version: u32,
20 pub format: String,
21 pub dataset_id: String,
22 pub name: Option<String>,
23 pub created_at: String,
24 pub updated_at: String,
25 pub arrays: BTreeMap<String, DataArrayMeta>,
26 pub attrs: BTreeMap<String, serde_json::Value>,
27 pub txn_sequence: u64,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct DataArrayMeta {
32 pub dtype: String,
33 pub shape: Vec<usize>,
34 pub chunk_shape: Vec<usize>,
35 #[serde(default = "default_array_order")]
36 pub order: String,
37 pub codec: String,
38 #[serde(default)]
39 pub chunk_index_path: Option<String>,
40 pub data_path: String,
41}
42
43fn default_array_order() -> String {
44 "column_major".to_string()
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct DataArrayPayload {
49 pub dtype: String,
50 pub shape: Vec<usize>,
51 pub values: Vec<f64>,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct DataChunkIndex {
56 pub schema_version: u32,
57 pub array: String,
58 pub chunks: Vec<DataChunkIndexEntry>,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct DataChunkIndexEntry {
63 pub key: String,
64 pub object_id: String,
65 pub hash: String,
66 pub bytes_raw: u64,
67 pub bytes_stored: u64,
68 #[serde(default)]
69 pub coords: Vec<usize>,
70 #[serde(default)]
71 pub shape: Vec<usize>,
72 pub data_path: String,
73}
74
75#[derive(Debug, Clone)]
76pub struct DataSchema {
77 pub arrays: BTreeMap<String, DataArrayMeta>,
78}
79
80#[derive(Debug, Clone)]
81pub struct PendingTxn {
82 pub dataset_path: String,
83 pub base_sequence: u64,
84 pub writes: Vec<PendingWrite>,
85 pub resizes: Vec<PendingResize>,
86 pub fills: Vec<PendingFill>,
87 pub create_arrays: Vec<PendingCreateArray>,
88 pub delete_arrays: Vec<String>,
89 pub attrs: BTreeMap<String, Value>,
90 pub status: TxnStatus,
91}
92
93#[derive(Debug, Clone)]
94pub struct PendingWrite {
95 pub array: String,
96 pub slice_spec: Option<Value>,
97 pub value: Value,
98}
99
100#[derive(Debug, Clone)]
101pub struct PendingResize {
102 pub array: String,
103 pub shape: Vec<usize>,
104}
105
106#[derive(Debug, Clone)]
107pub struct PendingFill {
108 pub array: String,
109 pub slice_spec: Option<Value>,
110 pub value: Value,
111}
112
113#[derive(Debug, Clone)]
114pub struct PendingCreateArray {
115 pub array: String,
116 pub meta: DataArrayMeta,
117}
118
119#[derive(Debug, Clone, PartialEq, Eq)]
120pub enum TxnStatus {
121 Open,
122 Committed,
123 Aborted,
124}
125
126fn tx_registry() -> &'static Mutex<HashMap<String, PendingTxn>> {
127 static REG: OnceLock<Mutex<HashMap<String, PendingTxn>>> = OnceLock::new();
128 REG.get_or_init(|| Mutex::new(HashMap::new()))
129}
130
131pub fn data_error(message: impl Into<String>) -> RuntimeError {
132 build_runtime_error(message)
133 .with_identifier("RUNMAT:Data:Error")
134 .with_builtin("data")
135 .build()
136}
137
138pub fn parse_string(value: &Value, context: &str) -> BuiltinResult<String> {
139 match value {
140 Value::String(s) => Ok(s.clone()),
141 Value::CharArray(ca) => Ok(ca.to_string()),
142 _ => Err(data_error(format!("{context}: expected string value"))),
143 }
144}
145
146pub fn dataset_root(path: &str) -> PathBuf {
147 PathBuf::from(path)
148}
149
150pub fn manifest_path(root: &Path) -> PathBuf {
151 root.join("manifest.json")
152}
153
154pub fn arrays_root(root: &Path) -> PathBuf {
155 root.join("arrays")
156}
157
158pub async fn write_manifest_async(root: &Path, manifest: &DataManifest) -> BuiltinResult<()> {
159 fs::create_dir_all_async(root).await.map_err(|err| {
160 data_error(format!(
161 "failed to create dataset root '{}': {err}",
162 root.display()
163 ))
164 })?;
165 let path = manifest_path(root);
166 let bytes = serde_json::to_vec_pretty(manifest)
167 .map_err(|err| data_error(format!("failed to encode manifest json: {err}")))?;
168 fs::write_async(&path, &bytes).await.map_err(|err| {
169 data_error(format!(
170 "failed to write manifest '{}': {err}",
171 path.display()
172 ))
173 })?;
174 Ok(())
175}
176
177pub async fn read_manifest_async(root: &Path) -> BuiltinResult<DataManifest> {
178 let path = manifest_path(root);
179 let bytes = fs::read_async(&path).await.map_err(|err| {
180 data_error(format!(
181 "failed to read manifest '{}': {err}",
182 path.display()
183 ))
184 })?;
185 let manifest = serde_json::from_slice::<DataManifest>(&bytes).map_err(|err| {
186 data_error(format!(
187 "failed to parse manifest '{}': {err}",
188 path.display()
189 ))
190 })?;
191 Ok(manifest)
192}
193
194pub async fn write_array_payload_async(
195 root: &Path,
196 array: &str,
197 payload: &DataArrayPayload,
198 chunk_shape: &[usize],
199) -> BuiltinResult<(PathBuf, PathBuf)> {
200 let array_dir = arrays_root(root).join(array);
201 fs::create_dir_all_async(&array_dir).await.map_err(|err| {
202 data_error(format!(
203 "failed to create array dir '{}': {err}",
204 array_dir.display()
205 ))
206 })?;
207 let payload_path = array_dir.join("data.f64.json");
208 let bytes = serde_json::to_vec(payload)
209 .map_err(|err| data_error(format!("failed to encode array payload json: {err}")))?;
210 fs::write_async(&payload_path, &bytes)
211 .await
212 .map_err(|err| {
213 data_error(format!(
214 "failed to write payload '{}': {err}",
215 payload_path.display()
216 ))
217 })?;
218
219 let chunk_dir = array_dir.join("chunks");
220 fs::create_dir_all_async(&chunk_dir).await.map_err(|err| {
221 data_error(format!(
222 "failed to create chunk dir '{}': {err}",
223 chunk_dir.display()
224 ))
225 })?;
226
227 let mut index = DataChunkIndex {
228 schema_version: 1,
229 array: array.to_string(),
230 chunks: Vec::new(),
231 };
232 let mut upload_chunks = Vec::new();
233 let grid_shape = chunk_grid_shape(&payload.shape, chunk_shape);
234 let mut coords = vec![0usize; payload.shape.len()];
235 loop {
236 let chunk_start = chunk_start_for_coords(&coords, chunk_shape);
237 let chunk_extent = chunk_extent_for_start(&chunk_start, chunk_shape, &payload.shape);
238 let chunk_payload = DataArrayPayload {
239 dtype: payload.dtype.clone(),
240 shape: chunk_extent.clone(),
241 values: collect_chunk_values(payload, &chunk_start, &chunk_extent)?,
242 };
243 let key = chunk_key(&coords);
244 let object_id = format!("obj_{}", key.replace('.', "_"));
245 let chunk_bytes = serde_json::to_vec(&chunk_payload)
246 .map_err(|err| data_error(format!("failed to encode chunk payload: {err}")))?;
247 let data_path = chunk_dir.join(format!("{object_id}.json"));
248 fs::write_async(&data_path, &chunk_bytes)
249 .await
250 .map_err(|err| {
251 data_error(format!(
252 "failed to write chunk '{}': {err}",
253 data_path.display()
254 ))
255 })?;
256 let hash = sha256_hex(&chunk_bytes);
257 let rel_chunk_path = data_path
258 .strip_prefix(root)
259 .map_err(|err| data_error(format!("failed to compute chunk relative path: {err}")))?
260 .to_string_lossy()
261 .to_string();
262 index.chunks.push(DataChunkIndexEntry {
263 key: key.clone(),
264 object_id: object_id.clone(),
265 hash: hash.clone(),
266 bytes_raw: chunk_bytes.len() as u64,
267 bytes_stored: chunk_bytes.len() as u64,
268 coords: coords.clone(),
269 shape: chunk_extent,
270 data_path: rel_chunk_path,
271 });
272 upload_chunks.push((
273 DataChunkDescriptor {
274 key,
275 object_id,
276 hash,
277 bytes_raw: chunk_bytes.len() as u64,
278 bytes_stored: chunk_bytes.len() as u64,
279 },
280 chunk_bytes,
281 ));
282 if !advance_index(&mut coords, &grid_shape) {
283 break;
284 }
285 }
286
287 maybe_upload_chunks_async(root, array, upload_chunks).await?;
288
289 tracing::info!(
290 target: "runmat.data",
291 dataset = %root.display(),
292 array = array,
293 chunks = index.chunks.len(),
294 payload_bytes = bytes.len(),
295 "data chunk write planned"
296 );
297
298 let chunk_index_path = chunk_dir.join("index.json");
299 let chunk_index_bytes = serde_json::to_vec(&index)
300 .map_err(|err| data_error(format!("failed to encode chunk index json: {err}")))?;
301 fs::write_async(&chunk_index_path, &chunk_index_bytes)
302 .await
303 .map_err(|err| {
304 data_error(format!(
305 "failed to write chunk index '{}': {err}",
306 chunk_index_path.display()
307 ))
308 })?;
309 Ok((payload_path, chunk_index_path))
310}
311
312pub async fn read_array_payload_async(
313 root: &Path,
314 meta: &DataArrayMeta,
315) -> BuiltinResult<DataArrayPayload> {
316 if let Some(index_path) = &meta.chunk_index_path {
317 let path = root.join(index_path);
318 if fs::metadata_async(&path).await.is_ok() {
319 return read_array_payload_chunked_async(root, meta, &path).await;
320 }
321 }
322 let payload_path = root.join(&meta.data_path);
323 let bytes = fs::read_async(&payload_path).await.map_err(|err| {
324 data_error(format!(
325 "failed to read payload '{}': {err}",
326 payload_path.display()
327 ))
328 })?;
329 serde_json::from_slice::<DataArrayPayload>(&bytes).map_err(|err| {
330 data_error(format!(
331 "failed to parse payload '{}': {err}",
332 payload_path.display()
333 ))
334 })
335}
336
337pub async fn read_array_slice_payload_async(
338 root: &Path,
339 meta: &DataArrayMeta,
340 start: &[usize],
341 shape: &[usize],
342) -> BuiltinResult<DataArrayPayload> {
343 let (slice_start, slice_shape) = normalize_slice_bounds(&meta.shape, start, shape)?;
344 if let Some(index_path) = &meta.chunk_index_path {
345 let path = root.join(index_path);
346 if fs::metadata_async(&path).await.is_ok() {
347 return read_array_payload_chunked_slice_async(
348 root,
349 meta,
350 &path,
351 &slice_start,
352 &slice_shape,
353 )
354 .await;
355 }
356 }
357 let full = read_array_payload_async(root, meta).await?;
358 extract_slice_payload(&full, &slice_start, &slice_shape)
359}
360
361async fn read_array_payload_chunked_slice_async(
362 root: &Path,
363 meta: &DataArrayMeta,
364 index_path: &Path,
365 slice_start: &[usize],
366 slice_shape: &[usize],
367) -> BuiltinResult<DataArrayPayload> {
368 let bytes = fs::read_async(index_path).await.map_err(|err| {
369 data_error(format!(
370 "failed to read chunk index '{}': {err}",
371 index_path.display()
372 ))
373 })?;
374 let index: DataChunkIndex = serde_json::from_slice(&bytes).map_err(|err| {
375 data_error(format!(
376 "failed to parse chunk index '{}': {err}",
377 index_path.display()
378 ))
379 })?;
380
381 let mut values = vec![0.0; slice_shape.iter().copied().product::<usize>()];
382 for chunk in index.chunks {
383 let coords = chunk_coords_from_entry(&chunk, meta.shape.len())?;
384 let chunk_start = chunk_start_for_coords(&coords, &meta.chunk_shape);
385 let chunk_extent = if chunk.shape.is_empty() {
386 chunk_extent_for_start(&chunk_start, &meta.chunk_shape, &meta.shape)
387 } else {
388 chunk.shape.clone()
389 };
390 if !chunk_intersects_slice(&chunk_start, &chunk_extent, slice_start, slice_shape) {
391 continue;
392 }
393
394 let chunk_path = root.join(&chunk.data_path);
395 let bytes = fs::read_async(&chunk_path).await.map_err(|err| {
396 data_error(format!(
397 "failed to read chunk payload '{}': {err}",
398 chunk_path.display()
399 ))
400 })?;
401 let payload: DataArrayPayload = serde_json::from_slice(&bytes).map_err(|err| {
402 data_error(format!(
403 "failed to parse chunk payload '{}': {err}",
404 chunk_path.display()
405 ))
406 })?;
407 if payload.shape != chunk_extent {
408 return Err(data_error(format!(
409 "chunk payload shape mismatch for key '{}': {:?} != {:?}",
410 chunk.key, payload.shape, chunk_extent
411 )));
412 }
413
414 let mut local = vec![0usize; chunk_extent.len()];
415 loop {
416 let mut global = Vec::with_capacity(chunk_extent.len());
417 for dim in 0..chunk_extent.len() {
418 global.push(chunk_start[dim] + local[dim]);
419 }
420 if coordinate_in_slice(&global, slice_start, slice_shape) {
421 let src_linear = linear_index_column_major(&local, &chunk_extent)?;
422 let mut dst = Vec::with_capacity(slice_shape.len());
423 for dim in 0..slice_shape.len() {
424 dst.push(global[dim].saturating_sub(slice_start[dim]));
425 }
426 let dst_linear = linear_index_column_major(&dst, slice_shape)?;
427 values[dst_linear] = payload.values[src_linear];
428 }
429 if !advance_index(&mut local, &chunk_extent) {
430 break;
431 }
432 }
433 }
434
435 Ok(DataArrayPayload {
436 dtype: meta.dtype.clone(),
437 shape: slice_shape.to_vec(),
438 values,
439 })
440}
441
442async fn read_array_payload_chunked_async(
443 root: &Path,
444 meta: &DataArrayMeta,
445 index_path: &Path,
446) -> BuiltinResult<DataArrayPayload> {
447 let bytes = fs::read_async(index_path).await.map_err(|err| {
448 data_error(format!(
449 "failed to read chunk index '{}': {err}",
450 index_path.display()
451 ))
452 })?;
453 let index: DataChunkIndex = serde_json::from_slice(&bytes).map_err(|err| {
454 data_error(format!(
455 "failed to parse chunk index '{}': {err}",
456 index_path.display()
457 ))
458 })?;
459 let mut values = vec![0.0; meta.shape.iter().copied().product::<usize>()];
460 for chunk in index.chunks {
461 let chunk_path = root.join(&chunk.data_path);
462 let bytes = fs::read_async(&chunk_path).await.map_err(|err| {
463 data_error(format!(
464 "failed to read chunk payload '{}': {err}",
465 chunk_path.display()
466 ))
467 })?;
468 let payload: DataArrayPayload = serde_json::from_slice(&bytes).map_err(|err| {
469 data_error(format!(
470 "failed to parse chunk payload '{}': {err}",
471 chunk_path.display()
472 ))
473 })?;
474 let coords = chunk_coords_from_entry(&chunk, meta.shape.len())?;
475 let chunk_start = chunk_start_for_coords(&coords, &meta.chunk_shape);
476 let chunk_extent = if chunk.shape.is_empty() {
477 chunk_extent_for_start(&chunk_start, &meta.chunk_shape, &meta.shape)
478 } else {
479 chunk.shape.clone()
480 };
481 if payload.shape != chunk_extent {
482 return Err(data_error(format!(
483 "chunk payload shape mismatch for key '{}': {:?} != {:?}",
484 chunk.key, payload.shape, chunk_extent
485 )));
486 }
487 let mut local = vec![0usize; chunk_extent.len()];
488 loop {
489 let mut global = Vec::with_capacity(chunk_extent.len());
490 for dim in 0..chunk_extent.len() {
491 global.push(chunk_start[dim] + local[dim]);
492 }
493 let src_linear = linear_index_column_major(&local, &chunk_extent)?;
494 let dst_linear = linear_index_column_major(&global, &meta.shape)?;
495 values[dst_linear] = payload.values[src_linear];
496 if !advance_index(&mut local, &chunk_extent) {
497 break;
498 }
499 }
500 }
501 Ok(DataArrayPayload {
502 dtype: meta.dtype.clone(),
503 shape: meta.shape.clone(),
504 values,
505 })
506}
507
508async fn maybe_upload_chunks_async(
509 root: &Path,
510 array: &str,
511 chunks: Vec<(DataChunkDescriptor, Vec<u8>)>,
512) -> BuiltinResult<()> {
513 if chunks.is_empty() {
514 return Ok(());
515 }
516 let request = DataChunkUploadRequest {
517 dataset_path: root.to_string_lossy().to_string(),
518 array: array.to_string(),
519 chunks: chunks.iter().map(|(desc, _)| desc.clone()).collect(),
520 };
521 let targets = match fs::data_chunk_upload_targets_async(&request).await {
522 Ok(targets) => targets,
523 Err(err) if err.kind() == std::io::ErrorKind::Unsupported => return Ok(()),
524 Err(err) => {
525 return Err(data_error(format!(
526 "failed to request data chunk upload targets: {err}"
527 )))
528 }
529 };
530 for (descriptor, bytes) in chunks {
531 let target = find_chunk_target(&targets, &descriptor.key)?;
532 fs::data_upload_chunk_async(target, &bytes)
533 .await
534 .map_err(|err| {
535 data_error(format!(
536 "failed to upload chunk '{}': {err}",
537 descriptor.key
538 ))
539 })?;
540 tracing::info!(
541 target: "runmat.data",
542 dataset = %root.display(),
543 array = array,
544 chunk_key = descriptor.key,
545 bytes = bytes.len(),
546 "data chunk uploaded"
547 );
548 }
549 Ok(())
550}
551
552fn find_chunk_target<'a>(
553 targets: &'a [DataChunkUploadTarget],
554 key: &str,
555) -> BuiltinResult<&'a DataChunkUploadTarget> {
556 targets
557 .iter()
558 .find(|target| target.key == key)
559 .ok_or_else(|| data_error(format!("missing upload target for chunk '{key}'")))
560}
561
562pub fn sha256_hex(bytes: &[u8]) -> String {
563 let mut hasher = Sha256::new();
564 hasher.update(bytes);
565 let digest = hasher.finalize();
566 format!("sha256:{:x}", digest)
567}
568
569fn chunk_key(coords: &[usize]) -> String {
570 coords
571 .iter()
572 .map(|v| v.to_string())
573 .collect::<Vec<_>>()
574 .join(".")
575}
576
577fn chunk_grid_shape(shape: &[usize], chunk_shape: &[usize]) -> Vec<usize> {
578 shape
579 .iter()
580 .enumerate()
581 .map(|(idx, extent)| {
582 let chunk = chunk_shape.get(idx).copied().unwrap_or(1).max(1);
583 extent.div_ceil(chunk)
584 })
585 .collect()
586}
587
588fn chunk_start_for_coords(coords: &[usize], chunk_shape: &[usize]) -> Vec<usize> {
589 coords
590 .iter()
591 .enumerate()
592 .map(|(idx, coord)| coord * chunk_shape.get(idx).copied().unwrap_or(1).max(1))
593 .collect()
594}
595
596fn chunk_extent_for_start(
597 start: &[usize],
598 chunk_shape: &[usize],
599 full_shape: &[usize],
600) -> Vec<usize> {
601 start
602 .iter()
603 .enumerate()
604 .map(|(idx, start)| {
605 let chunk = chunk_shape.get(idx).copied().unwrap_or(1).max(1);
606 let end = (*start + chunk).min(full_shape[idx]);
607 end.saturating_sub(*start)
608 })
609 .collect()
610}
611
612fn collect_chunk_values(
613 payload: &DataArrayPayload,
614 chunk_start: &[usize],
615 chunk_extent: &[usize],
616) -> BuiltinResult<Vec<f64>> {
617 let mut local = vec![0usize; chunk_extent.len()];
618 let mut values = Vec::with_capacity(chunk_extent.iter().copied().product());
619 loop {
620 let mut global = Vec::with_capacity(chunk_extent.len());
621 for dim in 0..chunk_extent.len() {
622 global.push(chunk_start[dim] + local[dim]);
623 }
624 let linear = linear_index_column_major(&global, &payload.shape)?;
625 values.push(payload.values[linear]);
626 if !advance_index(&mut local, chunk_extent) {
627 break;
628 }
629 }
630 Ok(values)
631}
632
633fn chunk_coords_from_entry(entry: &DataChunkIndexEntry, rank: usize) -> BuiltinResult<Vec<usize>> {
634 if !entry.coords.is_empty() {
635 if entry.coords.len() != rank {
636 return Err(data_error(format!(
637 "chunk coords rank mismatch for key '{}': expected {rank}, got {}",
638 entry.key,
639 entry.coords.len()
640 )));
641 }
642 return Ok(entry.coords.clone());
643 }
644 let coords = entry
645 .key
646 .split('.')
647 .map(|part| {
648 part.parse::<usize>()
649 .map_err(|_| data_error(format!("invalid chunk key '{}'", entry.key)))
650 })
651 .collect::<BuiltinResult<Vec<_>>>()?;
652 if coords.len() != rank {
653 return Err(data_error(format!(
654 "chunk key rank mismatch for key '{}': expected {rank}, got {}",
655 entry.key,
656 coords.len()
657 )));
658 }
659 Ok(coords)
660}
661
662fn normalize_slice_bounds(
663 full_shape: &[usize],
664 start: &[usize],
665 shape: &[usize],
666) -> BuiltinResult<(Vec<usize>, Vec<usize>)> {
667 if full_shape.is_empty() {
668 return Ok((Vec::new(), Vec::new()));
669 }
670 let mut normalized_start = Vec::with_capacity(full_shape.len());
671 let mut normalized_shape = Vec::with_capacity(full_shape.len());
672 for (axis, axis_len) in full_shape.iter().copied().enumerate() {
673 if axis_len == 0 {
674 return Err(data_error("slice axis length must be greater than zero"));
675 }
676 let requested_start = start.get(axis).copied().unwrap_or(0);
677 let clamped_start = requested_start.min(axis_len.saturating_sub(1));
678 let requested_span = shape.get(axis).copied().unwrap_or(axis_len);
679 let clamped_span = requested_span
680 .max(1)
681 .min(axis_len.saturating_sub(clamped_start));
682 normalized_start.push(clamped_start);
683 normalized_shape.push(clamped_span);
684 }
685 Ok((normalized_start, normalized_shape))
686}
687
688fn coordinate_in_slice(global: &[usize], slice_start: &[usize], slice_shape: &[usize]) -> bool {
689 for dim in 0..slice_shape.len() {
690 let start = slice_start[dim];
691 let end = start.saturating_add(slice_shape[dim]);
692 let value = global[dim];
693 if value < start || value >= end {
694 return false;
695 }
696 }
697 true
698}
699
700fn chunk_intersects_slice(
701 chunk_start: &[usize],
702 chunk_extent: &[usize],
703 slice_start: &[usize],
704 slice_shape: &[usize],
705) -> bool {
706 for dim in 0..slice_shape.len() {
707 let chunk_lo = chunk_start[dim];
708 let chunk_hi = chunk_lo.saturating_add(chunk_extent[dim]);
709 let slice_lo = slice_start[dim];
710 let slice_hi = slice_lo.saturating_add(slice_shape[dim]);
711 if chunk_hi <= slice_lo || slice_hi <= chunk_lo {
712 return false;
713 }
714 }
715 true
716}
717
718fn extract_slice_payload(
719 payload: &DataArrayPayload,
720 start: &[usize],
721 shape: &[usize],
722) -> BuiltinResult<DataArrayPayload> {
723 let mut values = Vec::with_capacity(shape.iter().copied().product());
724 if shape.is_empty() {
725 return Ok(DataArrayPayload {
726 dtype: payload.dtype.clone(),
727 shape: Vec::new(),
728 values,
729 });
730 }
731 let mut local = vec![0usize; shape.len()];
732 loop {
733 let mut global = Vec::with_capacity(shape.len());
734 for dim in 0..shape.len() {
735 global.push(start[dim] + local[dim]);
736 }
737 let linear = linear_index_column_major(&global, &payload.shape)?;
738 values.push(payload.values[linear]);
739 if !advance_index(&mut local, shape) {
740 break;
741 }
742 }
743 Ok(DataArrayPayload {
744 dtype: payload.dtype.clone(),
745 shape: shape.to_vec(),
746 values,
747 })
748}
749
750fn linear_index_column_major(index: &[usize], shape: &[usize]) -> BuiltinResult<usize> {
751 if index.len() != shape.len() {
752 return Err(data_error("chunk index rank mismatch"));
753 }
754 let mut stride = 1usize;
755 let mut linear = 0usize;
756 for (idx, extent) in index.iter().zip(shape.iter()) {
757 if *idx >= *extent {
758 return Err(data_error("chunk index out of bounds"));
759 }
760 linear += idx * stride;
761 stride = stride.saturating_mul(*extent);
762 }
763 Ok(linear)
764}
765
766fn advance_index(index: &mut [usize], shape: &[usize]) -> bool {
767 if shape.is_empty() {
768 return false;
769 }
770 for dim in 0..shape.len() {
771 index[dim] += 1;
772 if index[dim] < shape[dim] {
773 return true;
774 }
775 index[dim] = 0;
776 }
777 false
778}
779
780pub fn parse_schema(schema: &Value) -> BuiltinResult<DataSchema> {
781 let Value::Struct(schema_struct) = schema else {
782 return Err(data_error("data.create: schema must be a struct"));
783 };
784 let arrays_value = schema_struct
785 .fields
786 .get("arrays")
787 .ok_or_else(|| data_error("data.create: schema missing 'arrays' field"))?;
788 let Value::Struct(arrays_struct) = arrays_value else {
789 return Err(data_error("data.create: schema.arrays must be a struct"));
790 };
791
792 let mut arrays = BTreeMap::new();
793 for (name, meta_value) in &arrays_struct.fields {
794 let Value::Struct(meta_struct) = meta_value else {
795 return Err(data_error(format!(
796 "data.create: schema.arrays.{name} must be a struct"
797 )));
798 };
799 let dtype = meta_struct
800 .fields
801 .get("dtype")
802 .map(|v| parse_string(v, "data.create schema dtype"))
803 .transpose()?
804 .unwrap_or_else(|| "f64".to_string());
805 let shape = meta_struct
806 .fields
807 .get("shape")
808 .map(parse_usize_vector)
809 .transpose()?
810 .unwrap_or_else(|| vec![0, 0]);
811 let chunk_shape = meta_struct
812 .fields
813 .get("chunk")
814 .map(parse_usize_vector)
815 .transpose()?
816 .unwrap_or_else(|| default_chunk_shape(&shape));
817 let codec = meta_struct
818 .fields
819 .get("codec")
820 .map(|v| parse_string(v, "data.create schema codec"))
821 .transpose()?
822 .unwrap_or_else(|| "zstd".to_string());
823 let data_path = format!("arrays/{name}/data.f64.json");
824 let chunk_index_path = format!("arrays/{name}/chunks/index.json");
825 arrays.insert(
826 name.clone(),
827 DataArrayMeta {
828 dtype,
829 shape,
830 chunk_shape,
831 order: default_array_order(),
832 codec,
833 chunk_index_path: Some(chunk_index_path),
834 data_path,
835 },
836 );
837 }
838
839 Ok(DataSchema { arrays })
840}
841
842fn default_chunk_shape(shape: &[usize]) -> Vec<usize> {
843 if shape.is_empty() {
844 return vec![1024];
845 }
846 let mut out = shape.to_vec();
847 if out.len() == 1 {
848 out[0] = out[0].clamp(1, 65_536);
849 return out;
850 }
851 out[0] = out[0].clamp(1, 256);
852 out[1] = out[1].clamp(1, 256);
853 for dim in out.iter_mut().skip(2) {
854 *dim = (*dim).clamp(1, 8);
855 }
856 out
857}
858
859fn parse_usize_vector(value: &Value) -> BuiltinResult<Vec<usize>> {
860 match value {
861 Value::Tensor(t) => tensor_to_usize_vector(t),
862 Value::Num(n) => {
863 if *n < 0.0 || !n.is_finite() {
864 return Err(data_error(
865 "data schema dimensions must be non-negative finite numbers",
866 ));
867 }
868 Ok(vec![*n as usize])
869 }
870 Value::Int(i) => {
871 let n = i.to_i64();
872 if n < 0 {
873 return Err(data_error("data schema dimensions must be non-negative"));
874 }
875 Ok(vec![n as usize])
876 }
877 _ => Err(data_error(
878 "data schema dimension field must be numeric tensor/vector",
879 )),
880 }
881}
882
883fn tensor_to_usize_vector(t: &Tensor) -> BuiltinResult<Vec<usize>> {
884 let mut out = Vec::with_capacity(t.data.len());
885 for value in &t.data {
886 if !value.is_finite() || *value < 0.0 {
887 return Err(data_error(
888 "data schema dimensions must be non-negative finite numbers",
889 ));
890 }
891 out.push(*value as usize);
892 }
893 Ok(out)
894}
895
896pub fn dataset_object(path: &str, manifest: &DataManifest) -> Value {
897 let mut obj = ObjectInstance::new("Dataset".to_string());
898 obj.properties
899 .insert("__data_path".to_string(), Value::String(path.to_string()));
900 obj.properties.insert(
901 "__data_id".to_string(),
902 Value::String(manifest.dataset_id.clone()),
903 );
904 obj.properties.insert(
905 "__data_version".to_string(),
906 Value::String(manifest_version_token(manifest)),
907 );
908 Value::Object(obj)
909}
910
911pub fn manifest_version_token(manifest: &DataManifest) -> String {
912 format!("{}:{}", manifest.updated_at, manifest.txn_sequence)
913}
914
915pub fn ensure_manifest_sequence(expected: u64, manifest: &DataManifest) -> BuiltinResult<()> {
916 if manifest.txn_sequence != expected {
917 tracing::warn!(
918 target: "runmat.data",
919 expected_sequence = expected,
920 actual_sequence = manifest.txn_sequence,
921 "manifest conflict detected"
922 );
923 return Err(data_error(
924 "MANIFEST_CONFLICT: dataset changed since transaction begin",
925 ));
926 }
927 Ok(())
928}
929
930pub fn array_object(dataset_path: &str, array_name: &str) -> Value {
931 let mut obj = ObjectInstance::new("DataArray".to_string());
932 obj.properties.insert(
933 "__data_path".to_string(),
934 Value::String(dataset_path.to_string()),
935 );
936 obj.properties.insert(
937 "__array_name".to_string(),
938 Value::String(array_name.to_string()),
939 );
940 Value::Object(obj)
941}
942
943pub fn transaction_object(dataset_path: &str, tx_id: &str) -> Value {
944 let mut obj = ObjectInstance::new("DataTransaction".to_string());
945 obj.properties.insert(
946 "__data_path".to_string(),
947 Value::String(dataset_path.to_string()),
948 );
949 obj.properties
950 .insert("__tx_id".to_string(), Value::String(tx_id.to_string()));
951 Value::Object(obj)
952}
953
954pub fn get_object_prop<'a>(obj: &'a ObjectInstance, key: &str) -> BuiltinResult<&'a Value> {
955 obj.properties
956 .get(key)
957 .ok_or_else(|| data_error(format!("object missing internal property '{key}'")))
958}
959
960pub fn now_rfc3339() -> String {
961 Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true)
962}
963
964pub fn new_dataset_id() -> String {
965 static NEXT_DATASET_ID: AtomicU64 = AtomicU64::new(1);
966 let seq = NEXT_DATASET_ID.fetch_add(1, Ordering::Relaxed);
967 format!("ds_{}_{}", Utc::now().timestamp_millis(), seq)
968}
969
970pub fn new_tx_id() -> String {
971 static NEXT_TX_ID: AtomicU64 = AtomicU64::new(1);
972 let seq = NEXT_TX_ID.fetch_add(1, Ordering::Relaxed);
973 format!("tx_{}_{}", Utc::now().timestamp_millis(), seq)
974}
975
976pub fn start_tx(dataset_path: String, base_sequence: u64) -> String {
977 let tx_id = new_tx_id();
978 let pending = PendingTxn {
979 dataset_path,
980 base_sequence,
981 writes: Vec::new(),
982 resizes: Vec::new(),
983 fills: Vec::new(),
984 create_arrays: Vec::new(),
985 delete_arrays: Vec::new(),
986 attrs: BTreeMap::new(),
987 status: TxnStatus::Open,
988 };
989 let mut guard = tx_registry().lock().expect("tx registry lock poisoned");
990 guard.insert(tx_id.clone(), pending);
991 tx_id
992}
993
994pub fn with_tx_mut<T>(
995 tx_id: &str,
996 f: impl FnOnce(&mut PendingTxn) -> BuiltinResult<T>,
997) -> BuiltinResult<T> {
998 let mut guard = tx_registry().lock().expect("tx registry lock poisoned");
999 let tx = guard
1000 .get_mut(tx_id)
1001 .ok_or_else(|| data_error(format!("transaction '{tx_id}' not found")))?;
1002 f(tx)
1003}
1004
1005pub fn with_tx<T>(
1006 tx_id: &str,
1007 f: impl FnOnce(&PendingTxn) -> BuiltinResult<T>,
1008) -> BuiltinResult<T> {
1009 let guard = tx_registry().lock().expect("tx registry lock poisoned");
1010 let tx = guard
1011 .get(tx_id)
1012 .ok_or_else(|| data_error(format!("transaction '{tx_id}' not found")))?;
1013 f(tx)
1014}
1015
1016pub fn remove_tx(tx_id: &str) {
1017 let mut guard = tx_registry().lock().expect("tx registry lock poisoned");
1018 let _ = guard.remove(tx_id);
1019}
1020
1021#[cfg(test)]
1022mod tests {
1023 use super::*;
1024
1025 #[test]
1026 fn ensure_manifest_sequence_accepts_matching_sequence() {
1027 let manifest = DataManifest {
1028 schema_version: 1,
1029 format: "runmat-data".to_string(),
1030 dataset_id: "ds_test".to_string(),
1031 name: Some("test".to_string()),
1032 created_at: "2026-03-01T00:00:00Z".to_string(),
1033 updated_at: "2026-03-01T00:00:00Z".to_string(),
1034 arrays: BTreeMap::new(),
1035 attrs: BTreeMap::new(),
1036 txn_sequence: 5,
1037 };
1038 ensure_manifest_sequence(5, &manifest).expect("expected sequence match");
1039 }
1040
1041 #[test]
1042 fn ensure_manifest_sequence_rejects_conflict() {
1043 let manifest = DataManifest {
1044 schema_version: 1,
1045 format: "runmat-data".to_string(),
1046 dataset_id: "ds_test".to_string(),
1047 name: Some("test".to_string()),
1048 created_at: "2026-03-01T00:00:00Z".to_string(),
1049 updated_at: "2026-03-01T00:00:00Z".to_string(),
1050 arrays: BTreeMap::new(),
1051 attrs: BTreeMap::new(),
1052 txn_sequence: 6,
1053 };
1054 let err = ensure_manifest_sequence(5, &manifest).expect_err("expected conflict error");
1055 assert!(err.message().contains("MANIFEST_CONFLICT"));
1056 }
1057
1058 #[test]
1059 fn transaction_registry_roundtrip() {
1060 let tx_id = start_tx("/datasets/test.data".to_string(), 7);
1061 let status = with_tx(&tx_id, |tx| Ok(tx.status.clone())).expect("tx lookup");
1062 assert_eq!(status, TxnStatus::Open);
1063 remove_tx(&tx_id);
1064 let err = with_tx(&tx_id, |_| Ok(())).expect_err("expected missing tx");
1065 assert!(err.message().contains("not found"));
1066 }
1067
1068 #[test]
1069 fn sha256_hash_format_matches_expected_prefix() {
1070 let hash = sha256_hex(b"runmat");
1071 assert!(hash.starts_with("sha256:"));
1072 assert_eq!(hash.len(), "sha256:".len() + 64);
1073 }
1074}