1use std::collections::{BTreeMap, HashMap};
2use std::path::{Path, PathBuf};
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Mutex, OnceLock};
5
6use chrono::Utc;
7use runmat_builtins::{ObjectInstance, Tensor, Value};
8use runmat_filesystem as fs;
9use runmat_filesystem::data_contract::{
10 DataChunkDescriptor, DataChunkUploadRequest, DataChunkUploadTarget,
11};
12use serde::{Deserialize, Serialize};
13use sha2::{Digest, Sha256};
14
15use crate::{build_runtime_error, BuiltinResult, RuntimeError};
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct DataManifest {
19 pub schema_version: u32,
20 pub format: String,
21 pub dataset_id: String,
22 pub name: Option<String>,
23 pub created_at: String,
24 pub updated_at: String,
25 pub arrays: BTreeMap<String, DataArrayMeta>,
26 pub attrs: BTreeMap<String, serde_json::Value>,
27 pub txn_sequence: u64,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct DataArrayMeta {
32 pub dtype: String,
33 pub shape: Vec<usize>,
34 pub chunk_shape: Vec<usize>,
35 #[serde(default = "default_array_order")]
36 pub order: String,
37 pub codec: String,
38 #[serde(default)]
39 pub chunk_index_path: Option<String>,
40 pub data_path: String,
41}
42
43fn default_array_order() -> String {
44 "column_major".to_string()
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct DataArrayPayload {
49 pub dtype: String,
50 pub shape: Vec<usize>,
51 pub values: Vec<f64>,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct DataChunkIndex {
56 pub schema_version: u32,
57 pub array: String,
58 pub chunks: Vec<DataChunkIndexEntry>,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct DataChunkIndexEntry {
63 pub key: String,
64 pub object_id: String,
65 pub hash: String,
66 pub bytes_raw: u64,
67 pub bytes_stored: u64,
68 #[serde(default)]
69 pub coords: Vec<usize>,
70 #[serde(default)]
71 pub shape: Vec<usize>,
72 pub data_path: String,
73}
74
75#[derive(Debug, Clone)]
76pub struct DataSchema {
77 pub arrays: BTreeMap<String, DataArrayMeta>,
78}
79
80#[derive(Debug, Clone)]
81pub struct PendingTxn {
82 pub dataset_path: String,
83 pub base_sequence: u64,
84 pub writes: Vec<PendingWrite>,
85 pub resizes: Vec<PendingResize>,
86 pub fills: Vec<PendingFill>,
87 pub create_arrays: Vec<PendingCreateArray>,
88 pub delete_arrays: Vec<String>,
89 pub attrs: BTreeMap<String, Value>,
90 pub status: TxnStatus,
91}
92
93#[derive(Debug, Clone)]
94pub struct PendingWrite {
95 pub array: String,
96 pub slice_spec: Option<Value>,
97 pub value: Value,
98}
99
100#[derive(Debug, Clone)]
101pub struct PendingResize {
102 pub array: String,
103 pub shape: Vec<usize>,
104}
105
106#[derive(Debug, Clone)]
107pub struct PendingFill {
108 pub array: String,
109 pub slice_spec: Option<Value>,
110 pub value: Value,
111}
112
113#[derive(Debug, Clone)]
114pub struct PendingCreateArray {
115 pub array: String,
116 pub meta: DataArrayMeta,
117}
118
119#[derive(Debug, Clone, PartialEq, Eq)]
120pub enum TxnStatus {
121 Open,
122 Committed,
123 Aborted,
124}
125
126fn tx_registry() -> &'static Mutex<HashMap<String, PendingTxn>> {
127 static REG: OnceLock<Mutex<HashMap<String, PendingTxn>>> = OnceLock::new();
128 REG.get_or_init(|| Mutex::new(HashMap::new()))
129}
130
131pub fn data_error(message: impl Into<String>) -> RuntimeError {
132 build_runtime_error(message)
133 .with_identifier("RUNMAT:Data:Error")
134 .with_builtin("data")
135 .build()
136}
137
138fn data_error_with_identifier(
139 message: impl Into<String>,
140 identifier: &'static str,
141) -> RuntimeError {
142 build_runtime_error(message)
143 .with_identifier(identifier)
144 .with_builtin("data")
145 .build()
146}
147
148const DATA_MANIFEST_CONFLICT_IDENTIFIER: &str = "RunMat:data:ManifestConflict";
149const DATA_TRANSACTION_NOT_FOUND_IDENTIFIER: &str = "RunMat:data:TransactionNotFound";
150
151pub fn parse_string(value: &Value, context: &str) -> BuiltinResult<String> {
152 match value {
153 Value::String(s) => Ok(s.clone()),
154 Value::CharArray(ca) => Ok(ca.to_string()),
155 _ => Err(data_error(format!("{context}: expected string value"))),
156 }
157}
158
159pub fn dataset_root(path: &str) -> PathBuf {
160 PathBuf::from(path)
161}
162
163pub fn manifest_path(root: &Path) -> PathBuf {
164 root.join("manifest.json")
165}
166
167pub fn arrays_root(root: &Path) -> PathBuf {
168 root.join("arrays")
169}
170
171pub async fn write_manifest_async(root: &Path, manifest: &DataManifest) -> BuiltinResult<()> {
172 fs::create_dir_all_async(root).await.map_err(|err| {
173 data_error(format!(
174 "failed to create dataset root '{}': {err}",
175 root.display()
176 ))
177 })?;
178 let path = manifest_path(root);
179 let bytes = serde_json::to_vec_pretty(manifest)
180 .map_err(|err| data_error(format!("failed to encode manifest json: {err}")))?;
181 fs::write_async(&path, &bytes).await.map_err(|err| {
182 data_error(format!(
183 "failed to write manifest '{}': {err}",
184 path.display()
185 ))
186 })?;
187 Ok(())
188}
189
190pub async fn read_manifest_async(root: &Path) -> BuiltinResult<DataManifest> {
191 let path = manifest_path(root);
192 let bytes = fs::read_async(&path).await.map_err(|err| {
193 data_error(format!(
194 "failed to read manifest '{}': {err}",
195 path.display()
196 ))
197 })?;
198 let manifest = serde_json::from_slice::<DataManifest>(&bytes).map_err(|err| {
199 data_error(format!(
200 "failed to parse manifest '{}': {err}",
201 path.display()
202 ))
203 })?;
204 Ok(manifest)
205}
206
207pub async fn write_array_payload_async(
208 root: &Path,
209 array: &str,
210 payload: &DataArrayPayload,
211 chunk_shape: &[usize],
212) -> BuiltinResult<(PathBuf, PathBuf)> {
213 let array_dir = arrays_root(root).join(array);
214 fs::create_dir_all_async(&array_dir).await.map_err(|err| {
215 data_error(format!(
216 "failed to create array dir '{}': {err}",
217 array_dir.display()
218 ))
219 })?;
220 let payload_path = array_dir.join("data.f64.json");
221 let bytes = serde_json::to_vec(payload)
222 .map_err(|err| data_error(format!("failed to encode array payload json: {err}")))?;
223 fs::write_async(&payload_path, &bytes)
224 .await
225 .map_err(|err| {
226 data_error(format!(
227 "failed to write payload '{}': {err}",
228 payload_path.display()
229 ))
230 })?;
231
232 let chunk_dir = array_dir.join("chunks");
233 fs::create_dir_all_async(&chunk_dir).await.map_err(|err| {
234 data_error(format!(
235 "failed to create chunk dir '{}': {err}",
236 chunk_dir.display()
237 ))
238 })?;
239
240 let mut index = DataChunkIndex {
241 schema_version: 1,
242 array: array.to_string(),
243 chunks: Vec::new(),
244 };
245 let mut upload_chunks = Vec::new();
246 let grid_shape = chunk_grid_shape(&payload.shape, chunk_shape);
247 let mut coords = vec![0usize; payload.shape.len()];
248 loop {
249 let chunk_start = chunk_start_for_coords(&coords, chunk_shape);
250 let chunk_extent = chunk_extent_for_start(&chunk_start, chunk_shape, &payload.shape);
251 let chunk_payload = DataArrayPayload {
252 dtype: payload.dtype.clone(),
253 shape: chunk_extent.clone(),
254 values: collect_chunk_values(payload, &chunk_start, &chunk_extent)?,
255 };
256 let key = chunk_key(&coords);
257 let object_id = format!("obj_{}", key.replace('.', "_"));
258 let chunk_bytes = serde_json::to_vec(&chunk_payload)
259 .map_err(|err| data_error(format!("failed to encode chunk payload: {err}")))?;
260 let data_path = chunk_dir.join(format!("{object_id}.json"));
261 fs::write_async(&data_path, &chunk_bytes)
262 .await
263 .map_err(|err| {
264 data_error(format!(
265 "failed to write chunk '{}': {err}",
266 data_path.display()
267 ))
268 })?;
269 let hash = sha256_hex(&chunk_bytes);
270 let rel_chunk_path = data_path
271 .strip_prefix(root)
272 .map_err(|err| data_error(format!("failed to compute chunk relative path: {err}")))?
273 .to_string_lossy()
274 .to_string();
275 index.chunks.push(DataChunkIndexEntry {
276 key: key.clone(),
277 object_id: object_id.clone(),
278 hash: hash.clone(),
279 bytes_raw: chunk_bytes.len() as u64,
280 bytes_stored: chunk_bytes.len() as u64,
281 coords: coords.clone(),
282 shape: chunk_extent,
283 data_path: rel_chunk_path,
284 });
285 upload_chunks.push((
286 DataChunkDescriptor {
287 key,
288 object_id,
289 hash,
290 bytes_raw: chunk_bytes.len() as u64,
291 bytes_stored: chunk_bytes.len() as u64,
292 },
293 chunk_bytes,
294 ));
295 if !advance_index(&mut coords, &grid_shape) {
296 break;
297 }
298 }
299
300 maybe_upload_chunks_async(root, array, upload_chunks).await?;
301
302 tracing::info!(
303 target: "runmat.data",
304 dataset = %root.display(),
305 array = array,
306 chunks = index.chunks.len(),
307 payload_bytes = bytes.len(),
308 "data chunk write planned"
309 );
310
311 let chunk_index_path = chunk_dir.join("index.json");
312 let chunk_index_bytes = serde_json::to_vec(&index)
313 .map_err(|err| data_error(format!("failed to encode chunk index json: {err}")))?;
314 fs::write_async(&chunk_index_path, &chunk_index_bytes)
315 .await
316 .map_err(|err| {
317 data_error(format!(
318 "failed to write chunk index '{}': {err}",
319 chunk_index_path.display()
320 ))
321 })?;
322 Ok((payload_path, chunk_index_path))
323}
324
325pub async fn read_array_payload_async(
326 root: &Path,
327 meta: &DataArrayMeta,
328) -> BuiltinResult<DataArrayPayload> {
329 if let Some(index_path) = &meta.chunk_index_path {
330 let path = root.join(index_path);
331 if fs::metadata_async(&path).await.is_ok() {
332 return read_array_payload_chunked_async(root, meta, &path).await;
333 }
334 }
335 let payload_path = root.join(&meta.data_path);
336 let bytes = fs::read_async(&payload_path).await.map_err(|err| {
337 data_error(format!(
338 "failed to read payload '{}': {err}",
339 payload_path.display()
340 ))
341 })?;
342 serde_json::from_slice::<DataArrayPayload>(&bytes).map_err(|err| {
343 data_error(format!(
344 "failed to parse payload '{}': {err}",
345 payload_path.display()
346 ))
347 })
348}
349
350pub async fn read_array_slice_payload_async(
351 root: &Path,
352 meta: &DataArrayMeta,
353 start: &[usize],
354 shape: &[usize],
355) -> BuiltinResult<DataArrayPayload> {
356 let (slice_start, slice_shape) = normalize_slice_bounds(&meta.shape, start, shape)?;
357 if let Some(index_path) = &meta.chunk_index_path {
358 let path = root.join(index_path);
359 if fs::metadata_async(&path).await.is_ok() {
360 return read_array_payload_chunked_slice_async(
361 root,
362 meta,
363 &path,
364 &slice_start,
365 &slice_shape,
366 )
367 .await;
368 }
369 }
370 let full = read_array_payload_async(root, meta).await?;
371 extract_slice_payload(&full, &slice_start, &slice_shape)
372}
373
374async fn read_array_payload_chunked_slice_async(
375 root: &Path,
376 meta: &DataArrayMeta,
377 index_path: &Path,
378 slice_start: &[usize],
379 slice_shape: &[usize],
380) -> BuiltinResult<DataArrayPayload> {
381 let bytes = fs::read_async(index_path).await.map_err(|err| {
382 data_error(format!(
383 "failed to read chunk index '{}': {err}",
384 index_path.display()
385 ))
386 })?;
387 let index: DataChunkIndex = serde_json::from_slice(&bytes).map_err(|err| {
388 data_error(format!(
389 "failed to parse chunk index '{}': {err}",
390 index_path.display()
391 ))
392 })?;
393
394 let mut values = vec![0.0; slice_shape.iter().copied().product::<usize>()];
395 for chunk in index.chunks {
396 let coords = chunk_coords_from_entry(&chunk, meta.shape.len())?;
397 let chunk_start = chunk_start_for_coords(&coords, &meta.chunk_shape);
398 let chunk_extent = if chunk.shape.is_empty() {
399 chunk_extent_for_start(&chunk_start, &meta.chunk_shape, &meta.shape)
400 } else {
401 chunk.shape.clone()
402 };
403 if !chunk_intersects_slice(&chunk_start, &chunk_extent, slice_start, slice_shape) {
404 continue;
405 }
406
407 let chunk_path = root.join(&chunk.data_path);
408 let bytes = fs::read_async(&chunk_path).await.map_err(|err| {
409 data_error(format!(
410 "failed to read chunk payload '{}': {err}",
411 chunk_path.display()
412 ))
413 })?;
414 let payload: DataArrayPayload = serde_json::from_slice(&bytes).map_err(|err| {
415 data_error(format!(
416 "failed to parse chunk payload '{}': {err}",
417 chunk_path.display()
418 ))
419 })?;
420 if payload.shape != chunk_extent {
421 return Err(data_error(format!(
422 "chunk payload shape mismatch for key '{}': {:?} != {:?}",
423 chunk.key, payload.shape, chunk_extent
424 )));
425 }
426
427 let mut local = vec![0usize; chunk_extent.len()];
428 loop {
429 let mut global = Vec::with_capacity(chunk_extent.len());
430 for dim in 0..chunk_extent.len() {
431 global.push(chunk_start[dim] + local[dim]);
432 }
433 if coordinate_in_slice(&global, slice_start, slice_shape) {
434 let src_linear = linear_index_column_major(&local, &chunk_extent)?;
435 let mut dst = Vec::with_capacity(slice_shape.len());
436 for dim in 0..slice_shape.len() {
437 dst.push(global[dim].saturating_sub(slice_start[dim]));
438 }
439 let dst_linear = linear_index_column_major(&dst, slice_shape)?;
440 values[dst_linear] = payload.values[src_linear];
441 }
442 if !advance_index(&mut local, &chunk_extent) {
443 break;
444 }
445 }
446 }
447
448 Ok(DataArrayPayload {
449 dtype: meta.dtype.clone(),
450 shape: slice_shape.to_vec(),
451 values,
452 })
453}
454
455async fn read_array_payload_chunked_async(
456 root: &Path,
457 meta: &DataArrayMeta,
458 index_path: &Path,
459) -> BuiltinResult<DataArrayPayload> {
460 let bytes = fs::read_async(index_path).await.map_err(|err| {
461 data_error(format!(
462 "failed to read chunk index '{}': {err}",
463 index_path.display()
464 ))
465 })?;
466 let index: DataChunkIndex = serde_json::from_slice(&bytes).map_err(|err| {
467 data_error(format!(
468 "failed to parse chunk index '{}': {err}",
469 index_path.display()
470 ))
471 })?;
472 let mut values = vec![0.0; meta.shape.iter().copied().product::<usize>()];
473 for chunk in index.chunks {
474 let chunk_path = root.join(&chunk.data_path);
475 let bytes = fs::read_async(&chunk_path).await.map_err(|err| {
476 data_error(format!(
477 "failed to read chunk payload '{}': {err}",
478 chunk_path.display()
479 ))
480 })?;
481 let payload: DataArrayPayload = serde_json::from_slice(&bytes).map_err(|err| {
482 data_error(format!(
483 "failed to parse chunk payload '{}': {err}",
484 chunk_path.display()
485 ))
486 })?;
487 let coords = chunk_coords_from_entry(&chunk, meta.shape.len())?;
488 let chunk_start = chunk_start_for_coords(&coords, &meta.chunk_shape);
489 let chunk_extent = if chunk.shape.is_empty() {
490 chunk_extent_for_start(&chunk_start, &meta.chunk_shape, &meta.shape)
491 } else {
492 chunk.shape.clone()
493 };
494 if payload.shape != chunk_extent {
495 return Err(data_error(format!(
496 "chunk payload shape mismatch for key '{}': {:?} != {:?}",
497 chunk.key, payload.shape, chunk_extent
498 )));
499 }
500 let mut local = vec![0usize; chunk_extent.len()];
501 loop {
502 let mut global = Vec::with_capacity(chunk_extent.len());
503 for dim in 0..chunk_extent.len() {
504 global.push(chunk_start[dim] + local[dim]);
505 }
506 let src_linear = linear_index_column_major(&local, &chunk_extent)?;
507 let dst_linear = linear_index_column_major(&global, &meta.shape)?;
508 values[dst_linear] = payload.values[src_linear];
509 if !advance_index(&mut local, &chunk_extent) {
510 break;
511 }
512 }
513 }
514 Ok(DataArrayPayload {
515 dtype: meta.dtype.clone(),
516 shape: meta.shape.clone(),
517 values,
518 })
519}
520
521async fn maybe_upload_chunks_async(
522 root: &Path,
523 array: &str,
524 chunks: Vec<(DataChunkDescriptor, Vec<u8>)>,
525) -> BuiltinResult<()> {
526 if chunks.is_empty() {
527 return Ok(());
528 }
529 let request = DataChunkUploadRequest {
530 dataset_path: root.to_string_lossy().to_string(),
531 array: array.to_string(),
532 chunks: chunks.iter().map(|(desc, _)| desc.clone()).collect(),
533 };
534 let targets = match fs::data_chunk_upload_targets_async(&request).await {
535 Ok(targets) => targets,
536 Err(err) if err.kind() == std::io::ErrorKind::Unsupported => return Ok(()),
537 Err(err) => {
538 return Err(data_error(format!(
539 "failed to request data chunk upload targets: {err}"
540 )))
541 }
542 };
543 for (descriptor, bytes) in chunks {
544 let target = find_chunk_target(&targets, &descriptor.key)?;
545 fs::data_upload_chunk_async(target, &bytes)
546 .await
547 .map_err(|err| {
548 data_error(format!(
549 "failed to upload chunk '{}': {err}",
550 descriptor.key
551 ))
552 })?;
553 tracing::info!(
554 target: "runmat.data",
555 dataset = %root.display(),
556 array = array,
557 chunk_key = descriptor.key,
558 bytes = bytes.len(),
559 "data chunk uploaded"
560 );
561 }
562 Ok(())
563}
564
565fn find_chunk_target<'a>(
566 targets: &'a [DataChunkUploadTarget],
567 key: &str,
568) -> BuiltinResult<&'a DataChunkUploadTarget> {
569 targets
570 .iter()
571 .find(|target| target.key == key)
572 .ok_or_else(|| data_error(format!("missing upload target for chunk '{key}'")))
573}
574
575pub fn sha256_hex(bytes: &[u8]) -> String {
576 let mut hasher = Sha256::new();
577 hasher.update(bytes);
578 let digest = hasher.finalize();
579 format!("sha256:{:x}", digest)
580}
581
582fn chunk_key(coords: &[usize]) -> String {
583 coords
584 .iter()
585 .map(|v| v.to_string())
586 .collect::<Vec<_>>()
587 .join(".")
588}
589
590fn chunk_grid_shape(shape: &[usize], chunk_shape: &[usize]) -> Vec<usize> {
591 shape
592 .iter()
593 .enumerate()
594 .map(|(idx, extent)| {
595 let chunk = chunk_shape.get(idx).copied().unwrap_or(1).max(1);
596 extent.div_ceil(chunk)
597 })
598 .collect()
599}
600
601fn chunk_start_for_coords(coords: &[usize], chunk_shape: &[usize]) -> Vec<usize> {
602 coords
603 .iter()
604 .enumerate()
605 .map(|(idx, coord)| coord * chunk_shape.get(idx).copied().unwrap_or(1).max(1))
606 .collect()
607}
608
609fn chunk_extent_for_start(
610 start: &[usize],
611 chunk_shape: &[usize],
612 full_shape: &[usize],
613) -> Vec<usize> {
614 start
615 .iter()
616 .enumerate()
617 .map(|(idx, start)| {
618 let chunk = chunk_shape.get(idx).copied().unwrap_or(1).max(1);
619 let end = (*start + chunk).min(full_shape[idx]);
620 end.saturating_sub(*start)
621 })
622 .collect()
623}
624
625fn collect_chunk_values(
626 payload: &DataArrayPayload,
627 chunk_start: &[usize],
628 chunk_extent: &[usize],
629) -> BuiltinResult<Vec<f64>> {
630 let mut local = vec![0usize; chunk_extent.len()];
631 let mut values = Vec::with_capacity(chunk_extent.iter().copied().product());
632 loop {
633 let mut global = Vec::with_capacity(chunk_extent.len());
634 for dim in 0..chunk_extent.len() {
635 global.push(chunk_start[dim] + local[dim]);
636 }
637 let linear = linear_index_column_major(&global, &payload.shape)?;
638 values.push(payload.values[linear]);
639 if !advance_index(&mut local, chunk_extent) {
640 break;
641 }
642 }
643 Ok(values)
644}
645
646fn chunk_coords_from_entry(entry: &DataChunkIndexEntry, rank: usize) -> BuiltinResult<Vec<usize>> {
647 if !entry.coords.is_empty() {
648 if entry.coords.len() != rank {
649 return Err(data_error(format!(
650 "chunk coords rank mismatch for key '{}': expected {rank}, got {}",
651 entry.key,
652 entry.coords.len()
653 )));
654 }
655 return Ok(entry.coords.clone());
656 }
657 let coords = entry
658 .key
659 .split('.')
660 .map(|part| {
661 part.parse::<usize>()
662 .map_err(|_| data_error(format!("invalid chunk key '{}'", entry.key)))
663 })
664 .collect::<BuiltinResult<Vec<_>>>()?;
665 if coords.len() != rank {
666 return Err(data_error(format!(
667 "chunk key rank mismatch for key '{}': expected {rank}, got {}",
668 entry.key,
669 coords.len()
670 )));
671 }
672 Ok(coords)
673}
674
675fn normalize_slice_bounds(
676 full_shape: &[usize],
677 start: &[usize],
678 shape: &[usize],
679) -> BuiltinResult<(Vec<usize>, Vec<usize>)> {
680 if full_shape.is_empty() {
681 return Ok((Vec::new(), Vec::new()));
682 }
683 let mut normalized_start = Vec::with_capacity(full_shape.len());
684 let mut normalized_shape = Vec::with_capacity(full_shape.len());
685 for (axis, axis_len) in full_shape.iter().copied().enumerate() {
686 if axis_len == 0 {
687 return Err(data_error("slice axis length must be greater than zero"));
688 }
689 let requested_start = start.get(axis).copied().unwrap_or(0);
690 let clamped_start = requested_start.min(axis_len.saturating_sub(1));
691 let requested_span = shape.get(axis).copied().unwrap_or(axis_len);
692 let clamped_span = requested_span
693 .max(1)
694 .min(axis_len.saturating_sub(clamped_start));
695 normalized_start.push(clamped_start);
696 normalized_shape.push(clamped_span);
697 }
698 Ok((normalized_start, normalized_shape))
699}
700
701fn coordinate_in_slice(global: &[usize], slice_start: &[usize], slice_shape: &[usize]) -> bool {
702 for dim in 0..slice_shape.len() {
703 let start = slice_start[dim];
704 let end = start.saturating_add(slice_shape[dim]);
705 let value = global[dim];
706 if value < start || value >= end {
707 return false;
708 }
709 }
710 true
711}
712
713fn chunk_intersects_slice(
714 chunk_start: &[usize],
715 chunk_extent: &[usize],
716 slice_start: &[usize],
717 slice_shape: &[usize],
718) -> bool {
719 for dim in 0..slice_shape.len() {
720 let chunk_lo = chunk_start[dim];
721 let chunk_hi = chunk_lo.saturating_add(chunk_extent[dim]);
722 let slice_lo = slice_start[dim];
723 let slice_hi = slice_lo.saturating_add(slice_shape[dim]);
724 if chunk_hi <= slice_lo || slice_hi <= chunk_lo {
725 return false;
726 }
727 }
728 true
729}
730
731fn extract_slice_payload(
732 payload: &DataArrayPayload,
733 start: &[usize],
734 shape: &[usize],
735) -> BuiltinResult<DataArrayPayload> {
736 let mut values = Vec::with_capacity(shape.iter().copied().product());
737 if shape.is_empty() {
738 return Ok(DataArrayPayload {
739 dtype: payload.dtype.clone(),
740 shape: Vec::new(),
741 values,
742 });
743 }
744 let mut local = vec![0usize; shape.len()];
745 loop {
746 let mut global = Vec::with_capacity(shape.len());
747 for dim in 0..shape.len() {
748 global.push(start[dim] + local[dim]);
749 }
750 let linear = linear_index_column_major(&global, &payload.shape)?;
751 values.push(payload.values[linear]);
752 if !advance_index(&mut local, shape) {
753 break;
754 }
755 }
756 Ok(DataArrayPayload {
757 dtype: payload.dtype.clone(),
758 shape: shape.to_vec(),
759 values,
760 })
761}
762
763fn linear_index_column_major(index: &[usize], shape: &[usize]) -> BuiltinResult<usize> {
764 if index.len() != shape.len() {
765 return Err(data_error("chunk index rank mismatch"));
766 }
767 let mut stride = 1usize;
768 let mut linear = 0usize;
769 for (idx, extent) in index.iter().zip(shape.iter()) {
770 if *idx >= *extent {
771 return Err(data_error("chunk index out of bounds"));
772 }
773 linear += idx * stride;
774 stride = stride.saturating_mul(*extent);
775 }
776 Ok(linear)
777}
778
779fn advance_index(index: &mut [usize], shape: &[usize]) -> bool {
780 if shape.is_empty() {
781 return false;
782 }
783 for dim in 0..shape.len() {
784 index[dim] += 1;
785 if index[dim] < shape[dim] {
786 return true;
787 }
788 index[dim] = 0;
789 }
790 false
791}
792
793pub fn parse_schema(schema: &Value) -> BuiltinResult<DataSchema> {
794 let Value::Struct(schema_struct) = schema else {
795 return Err(data_error("data.create: schema must be a struct"));
796 };
797 let arrays_value = schema_struct
798 .fields
799 .get("arrays")
800 .ok_or_else(|| data_error("data.create: schema missing 'arrays' field"))?;
801 let Value::Struct(arrays_struct) = arrays_value else {
802 return Err(data_error("data.create: schema.arrays must be a struct"));
803 };
804
805 let mut arrays = BTreeMap::new();
806 for (name, meta_value) in &arrays_struct.fields {
807 let Value::Struct(meta_struct) = meta_value else {
808 return Err(data_error(format!(
809 "data.create: schema.arrays.{name} must be a struct"
810 )));
811 };
812 let dtype = meta_struct
813 .fields
814 .get("dtype")
815 .map(|v| parse_string(v, "data.create schema dtype"))
816 .transpose()?
817 .unwrap_or_else(|| "f64".to_string());
818 let shape = meta_struct
819 .fields
820 .get("shape")
821 .map(parse_usize_vector)
822 .transpose()?
823 .unwrap_or_else(|| vec![0, 0]);
824 let chunk_shape = meta_struct
825 .fields
826 .get("chunk")
827 .map(parse_usize_vector)
828 .transpose()?
829 .unwrap_or_else(|| default_chunk_shape(&shape));
830 let codec = meta_struct
831 .fields
832 .get("codec")
833 .map(|v| parse_string(v, "data.create schema codec"))
834 .transpose()?
835 .unwrap_or_else(|| "zstd".to_string());
836 let data_path = format!("arrays/{name}/data.f64.json");
837 let chunk_index_path = format!("arrays/{name}/chunks/index.json");
838 arrays.insert(
839 name.clone(),
840 DataArrayMeta {
841 dtype,
842 shape,
843 chunk_shape,
844 order: default_array_order(),
845 codec,
846 chunk_index_path: Some(chunk_index_path),
847 data_path,
848 },
849 );
850 }
851
852 Ok(DataSchema { arrays })
853}
854
855fn default_chunk_shape(shape: &[usize]) -> Vec<usize> {
856 if shape.is_empty() {
857 return vec![1024];
858 }
859 let mut out = shape.to_vec();
860 if out.len() == 1 {
861 out[0] = out[0].clamp(1, 65_536);
862 return out;
863 }
864 out[0] = out[0].clamp(1, 256);
865 out[1] = out[1].clamp(1, 256);
866 for dim in out.iter_mut().skip(2) {
867 *dim = (*dim).clamp(1, 8);
868 }
869 out
870}
871
872fn parse_usize_vector(value: &Value) -> BuiltinResult<Vec<usize>> {
873 match value {
874 Value::Tensor(t) => tensor_to_usize_vector(t),
875 Value::Num(n) => {
876 if *n < 0.0 || !n.is_finite() {
877 return Err(data_error(
878 "data schema dimensions must be non-negative finite numbers",
879 ));
880 }
881 Ok(vec![*n as usize])
882 }
883 Value::Int(i) => {
884 let n = i.to_i64();
885 if n < 0 {
886 return Err(data_error("data schema dimensions must be non-negative"));
887 }
888 Ok(vec![n as usize])
889 }
890 _ => Err(data_error(
891 "data schema dimension field must be numeric tensor/vector",
892 )),
893 }
894}
895
896fn tensor_to_usize_vector(t: &Tensor) -> BuiltinResult<Vec<usize>> {
897 let mut out = Vec::with_capacity(t.data.len());
898 for value in &t.data {
899 if !value.is_finite() || *value < 0.0 {
900 return Err(data_error(
901 "data schema dimensions must be non-negative finite numbers",
902 ));
903 }
904 out.push(*value as usize);
905 }
906 Ok(out)
907}
908
909pub fn dataset_object(path: &str, manifest: &DataManifest) -> Value {
910 let mut obj = ObjectInstance::new("Dataset".to_string());
911 obj.properties
912 .insert("__data_path".to_string(), Value::String(path.to_string()));
913 obj.properties.insert(
914 "__data_id".to_string(),
915 Value::String(manifest.dataset_id.clone()),
916 );
917 obj.properties.insert(
918 "__data_version".to_string(),
919 Value::String(manifest_version_token(manifest)),
920 );
921 Value::Object(obj)
922}
923
924pub fn manifest_version_token(manifest: &DataManifest) -> String {
925 format!("{}:{}", manifest.updated_at, manifest.txn_sequence)
926}
927
928pub fn ensure_manifest_sequence(expected: u64, manifest: &DataManifest) -> BuiltinResult<()> {
929 if manifest.txn_sequence != expected {
930 tracing::warn!(
931 target: "runmat.data",
932 expected_sequence = expected,
933 actual_sequence = manifest.txn_sequence,
934 "manifest conflict detected"
935 );
936 return Err(data_error_with_identifier(
937 "MANIFEST_CONFLICT: dataset changed since transaction begin",
938 DATA_MANIFEST_CONFLICT_IDENTIFIER,
939 ));
940 }
941 Ok(())
942}
943
944pub fn array_object(dataset_path: &str, array_name: &str) -> Value {
945 let mut obj = ObjectInstance::new("DataArray".to_string());
946 obj.properties.insert(
947 "__data_path".to_string(),
948 Value::String(dataset_path.to_string()),
949 );
950 obj.properties.insert(
951 "__array_name".to_string(),
952 Value::String(array_name.to_string()),
953 );
954 Value::Object(obj)
955}
956
957pub fn transaction_object(dataset_path: &str, tx_id: &str) -> Value {
958 let mut obj = ObjectInstance::new("DataTransaction".to_string());
959 obj.properties.insert(
960 "__data_path".to_string(),
961 Value::String(dataset_path.to_string()),
962 );
963 obj.properties
964 .insert("__tx_id".to_string(), Value::String(tx_id.to_string()));
965 Value::Object(obj)
966}
967
968pub fn get_object_prop<'a>(obj: &'a ObjectInstance, key: &str) -> BuiltinResult<&'a Value> {
969 obj.properties
970 .get(key)
971 .ok_or_else(|| data_error(format!("object missing internal property '{key}'")))
972}
973
974pub fn now_rfc3339() -> String {
975 Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true)
976}
977
978pub fn new_dataset_id() -> String {
979 static NEXT_DATASET_ID: AtomicU64 = AtomicU64::new(1);
980 let seq = NEXT_DATASET_ID.fetch_add(1, Ordering::Relaxed);
981 format!("ds_{}_{}", Utc::now().timestamp_millis(), seq)
982}
983
984pub fn new_tx_id() -> String {
985 static NEXT_TX_ID: AtomicU64 = AtomicU64::new(1);
986 let seq = NEXT_TX_ID.fetch_add(1, Ordering::Relaxed);
987 format!("tx_{}_{}", Utc::now().timestamp_millis(), seq)
988}
989
990pub fn start_tx(dataset_path: String, base_sequence: u64) -> String {
991 let tx_id = new_tx_id();
992 let pending = PendingTxn {
993 dataset_path,
994 base_sequence,
995 writes: Vec::new(),
996 resizes: Vec::new(),
997 fills: Vec::new(),
998 create_arrays: Vec::new(),
999 delete_arrays: Vec::new(),
1000 attrs: BTreeMap::new(),
1001 status: TxnStatus::Open,
1002 };
1003 let mut guard = tx_registry().lock().expect("tx registry lock poisoned");
1004 guard.insert(tx_id.clone(), pending);
1005 tx_id
1006}
1007
1008pub fn with_tx_mut<T>(
1009 tx_id: &str,
1010 f: impl FnOnce(&mut PendingTxn) -> BuiltinResult<T>,
1011) -> BuiltinResult<T> {
1012 let mut guard = tx_registry().lock().expect("tx registry lock poisoned");
1013 let tx = guard.get_mut(tx_id).ok_or_else(|| {
1014 data_error_with_identifier(
1015 format!("transaction '{tx_id}' not found"),
1016 DATA_TRANSACTION_NOT_FOUND_IDENTIFIER,
1017 )
1018 })?;
1019 f(tx)
1020}
1021
1022pub fn with_tx<T>(
1023 tx_id: &str,
1024 f: impl FnOnce(&PendingTxn) -> BuiltinResult<T>,
1025) -> BuiltinResult<T> {
1026 let guard = tx_registry().lock().expect("tx registry lock poisoned");
1027 let tx = guard.get(tx_id).ok_or_else(|| {
1028 data_error_with_identifier(
1029 format!("transaction '{tx_id}' not found"),
1030 DATA_TRANSACTION_NOT_FOUND_IDENTIFIER,
1031 )
1032 })?;
1033 f(tx)
1034}
1035
1036pub fn remove_tx(tx_id: &str) {
1037 let mut guard = tx_registry().lock().expect("tx registry lock poisoned");
1038 let _ = guard.remove(tx_id);
1039}
1040
1041#[cfg(test)]
1042mod tests {
1043 use super::*;
1044
1045 #[test]
1046 fn ensure_manifest_sequence_accepts_matching_sequence() {
1047 let manifest = DataManifest {
1048 schema_version: 1,
1049 format: "runmat-data".to_string(),
1050 dataset_id: "ds_test".to_string(),
1051 name: Some("test".to_string()),
1052 created_at: "2026-03-01T00:00:00Z".to_string(),
1053 updated_at: "2026-03-01T00:00:00Z".to_string(),
1054 arrays: BTreeMap::new(),
1055 attrs: BTreeMap::new(),
1056 txn_sequence: 5,
1057 };
1058 ensure_manifest_sequence(5, &manifest).expect("expected sequence match");
1059 }
1060
1061 #[test]
1062 fn ensure_manifest_sequence_rejects_conflict() {
1063 let manifest = DataManifest {
1064 schema_version: 1,
1065 format: "runmat-data".to_string(),
1066 dataset_id: "ds_test".to_string(),
1067 name: Some("test".to_string()),
1068 created_at: "2026-03-01T00:00:00Z".to_string(),
1069 updated_at: "2026-03-01T00:00:00Z".to_string(),
1070 arrays: BTreeMap::new(),
1071 attrs: BTreeMap::new(),
1072 txn_sequence: 6,
1073 };
1074 let err = ensure_manifest_sequence(5, &manifest).expect_err("expected conflict error");
1075 assert_eq!(
1076 err.identifier(),
1077 Some(DATA_MANIFEST_CONFLICT_IDENTIFIER),
1078 "manifest conflicts should expose a stable identifier"
1079 );
1080 }
1081
1082 #[test]
1083 fn transaction_registry_roundtrip() {
1084 let tx_id = start_tx("/datasets/test.data".to_string(), 7);
1085 let status = with_tx(&tx_id, |tx| Ok(tx.status.clone())).expect("tx lookup");
1086 assert_eq!(status, TxnStatus::Open);
1087 remove_tx(&tx_id);
1088 let err = with_tx(&tx_id, |_| Ok(())).expect_err("expected missing tx");
1089 assert_eq!(
1090 err.identifier(),
1091 Some(DATA_TRANSACTION_NOT_FOUND_IDENTIFIER),
1092 "missing transaction lookups should expose a stable identifier"
1093 );
1094 }
1095
1096 #[test]
1097 fn sha256_hash_format_matches_expected_prefix() {
1098 let hash = sha256_hex(b"runmat");
1099 assert!(hash.starts_with("sha256:"));
1100 assert_eq!(hash.len(), "sha256:".len() + 64);
1101 }
1102}