1use std::cell::RefCell;
2use std::collections::{BTreeMap, HashMap};
3use std::future::Future;
4use std::path::{Path, PathBuf};
5use std::sync::atomic::{AtomicU64, Ordering};
6
7use chrono::Utc;
8use runmat_builtins::{ObjectInstance, Tensor, Value};
9use runmat_filesystem as fs;
10use runmat_filesystem::data_contract::{
11 DataChunkDescriptor, DataChunkUploadRequest, DataChunkUploadTarget,
12};
13use serde::{Deserialize, Serialize};
14use sha2::{Digest, Sha256};
15
16use crate::{build_runtime_error, BuiltinResult, RuntimeError};
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct DataManifest {
20 pub schema_version: u32,
21 pub format: String,
22 pub dataset_id: String,
23 pub name: Option<String>,
24 pub created_at: String,
25 pub updated_at: String,
26 pub arrays: BTreeMap<String, DataArrayMeta>,
27 pub attrs: BTreeMap<String, serde_json::Value>,
28 pub txn_sequence: u64,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct DataArrayMeta {
33 pub dtype: String,
34 pub shape: Vec<usize>,
35 pub chunk_shape: Vec<usize>,
36 #[serde(default = "default_array_order")]
37 pub order: String,
38 pub codec: String,
39 #[serde(default)]
40 pub chunk_index_path: Option<String>,
41 pub data_path: String,
42}
43
44fn default_array_order() -> String {
45 "column_major".to_string()
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct DataArrayPayload {
50 pub dtype: String,
51 pub shape: Vec<usize>,
52 pub values: Vec<f64>,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct DataChunkIndex {
57 pub schema_version: u32,
58 pub array: String,
59 pub chunks: Vec<DataChunkIndexEntry>,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct DataChunkIndexEntry {
64 pub key: String,
65 pub object_id: String,
66 pub hash: String,
67 pub bytes_raw: u64,
68 pub bytes_stored: u64,
69 #[serde(default)]
70 pub coords: Vec<usize>,
71 #[serde(default)]
72 pub shape: Vec<usize>,
73 pub data_path: String,
74}
75
76#[derive(Debug, Clone)]
77pub struct DataSchema {
78 pub arrays: BTreeMap<String, DataArrayMeta>,
79}
80
81#[derive(Debug, Clone)]
82pub struct PendingTxn {
83 pub dataset_path: String,
84 pub base_sequence: u64,
85 pub writes: Vec<PendingWrite>,
86 pub resizes: Vec<PendingResize>,
87 pub fills: Vec<PendingFill>,
88 pub create_arrays: Vec<PendingCreateArray>,
89 pub delete_arrays: Vec<String>,
90 pub attrs: BTreeMap<String, Value>,
91 pub status: TxnStatus,
92}
93
94#[derive(Debug, Clone)]
95pub struct PendingWrite {
96 pub array: String,
97 pub slice_spec: Option<Value>,
98 pub value: Value,
99}
100
101#[derive(Debug, Clone)]
102pub struct PendingResize {
103 pub array: String,
104 pub shape: Vec<usize>,
105}
106
107#[derive(Debug, Clone)]
108pub struct PendingFill {
109 pub array: String,
110 pub slice_spec: Option<Value>,
111 pub value: Value,
112}
113
114#[derive(Debug, Clone)]
115pub struct PendingCreateArray {
116 pub array: String,
117 pub meta: DataArrayMeta,
118}
119
120#[derive(Debug, Clone, PartialEq, Eq)]
121pub enum TxnStatus {
122 Open,
123 Committed,
124 Aborted,
125}
126
127thread_local! {
128 static FALLBACK_TX_REGISTRY: RefCell<HashMap<String, PendingTxn>> = RefCell::new(HashMap::new());
129}
130
131#[cfg(not(target_arch = "wasm32"))]
132tokio::task_local! {
133 static TASK_TX_REGISTRY: RefCell<HashMap<String, PendingTxn>>;
134}
135
136pub async fn with_tx_registry_scope<F>(future: F) -> F::Output
137where
138 F: Future,
139{
140 #[cfg(not(target_arch = "wasm32"))]
141 {
142 if TASK_TX_REGISTRY.try_with(|_| ()).is_ok() {
143 future.await
144 } else {
145 TASK_TX_REGISTRY
146 .scope(RefCell::new(HashMap::new()), future)
147 .await
148 }
149 }
150 #[cfg(target_arch = "wasm32")]
151 {
152 future.await
153 }
154}
155
156fn with_tx_registry<T>(f: impl FnOnce(&mut HashMap<String, PendingTxn>) -> T) -> BuiltinResult<T> {
157 #[cfg(not(target_arch = "wasm32"))]
158 {
159 if TASK_TX_REGISTRY.try_with(|_| ()).is_ok() {
160 return TASK_TX_REGISTRY.with(|registry| {
161 let mut registry = registry.try_borrow_mut().map_err(|_| {
162 data_error("data transaction registry is already mutably borrowed")
163 })?;
164 Ok(f(&mut registry))
165 });
166 }
167 }
168
169 FALLBACK_TX_REGISTRY.with(|registry| {
170 let mut registry = registry
171 .try_borrow_mut()
172 .map_err(|_| data_error("data transaction registry is already mutably borrowed"))?;
173 Ok(f(&mut registry))
174 })
175}
176
177pub fn data_error(message: impl Into<String>) -> RuntimeError {
178 build_runtime_error(message)
179 .with_identifier("RUNMAT:Data:Error")
180 .with_builtin("data")
181 .build()
182}
183
184fn data_error_with_identifier(
185 message: impl Into<String>,
186 identifier: &'static str,
187) -> RuntimeError {
188 build_runtime_error(message)
189 .with_identifier(identifier)
190 .with_builtin("data")
191 .build()
192}
193
194const DATA_MANIFEST_CONFLICT_IDENTIFIER: &str = "RunMat:data:ManifestConflict";
195const DATA_TRANSACTION_NOT_FOUND_IDENTIFIER: &str = "RunMat:data:TransactionNotFound";
196
197pub fn parse_string(value: &Value, context: &str) -> BuiltinResult<String> {
198 match value {
199 Value::String(s) => Ok(s.clone()),
200 Value::CharArray(ca) => Ok(ca.to_string()),
201 _ => Err(data_error(format!("{context}: expected string value"))),
202 }
203}
204
205pub fn dataset_root(path: &str) -> PathBuf {
206 PathBuf::from(path)
207}
208
209pub fn manifest_path(root: &Path) -> PathBuf {
210 root.join("manifest.json")
211}
212
213pub fn arrays_root(root: &Path) -> PathBuf {
214 root.join("arrays")
215}
216
217pub async fn write_manifest_async(root: &Path, manifest: &DataManifest) -> BuiltinResult<()> {
218 fs::create_dir_all_async(root).await.map_err(|err| {
219 data_error(format!(
220 "failed to create dataset root '{}': {err}",
221 root.display()
222 ))
223 })?;
224 let path = manifest_path(root);
225 let bytes = serde_json::to_vec_pretty(manifest)
226 .map_err(|err| data_error(format!("failed to encode manifest json: {err}")))?;
227 fs::write_async(&path, &bytes).await.map_err(|err| {
228 data_error(format!(
229 "failed to write manifest '{}': {err}",
230 path.display()
231 ))
232 })?;
233 Ok(())
234}
235
236pub async fn read_manifest_async(root: &Path) -> BuiltinResult<DataManifest> {
237 let path = manifest_path(root);
238 let bytes = fs::read_async(&path).await.map_err(|err| {
239 data_error(format!(
240 "failed to read manifest '{}': {err}",
241 path.display()
242 ))
243 })?;
244 let manifest = serde_json::from_slice::<DataManifest>(&bytes).map_err(|err| {
245 data_error(format!(
246 "failed to parse manifest '{}': {err}",
247 path.display()
248 ))
249 })?;
250 Ok(manifest)
251}
252
253pub async fn write_array_payload_async(
254 root: &Path,
255 array: &str,
256 payload: &DataArrayPayload,
257 chunk_shape: &[usize],
258) -> BuiltinResult<(PathBuf, PathBuf)> {
259 let array_dir = arrays_root(root).join(array);
260 fs::create_dir_all_async(&array_dir).await.map_err(|err| {
261 data_error(format!(
262 "failed to create array dir '{}': {err}",
263 array_dir.display()
264 ))
265 })?;
266 let payload_path = array_dir.join("data.f64.json");
267 let bytes = serde_json::to_vec(payload)
268 .map_err(|err| data_error(format!("failed to encode array payload json: {err}")))?;
269 fs::write_async(&payload_path, &bytes)
270 .await
271 .map_err(|err| {
272 data_error(format!(
273 "failed to write payload '{}': {err}",
274 payload_path.display()
275 ))
276 })?;
277
278 let chunk_dir = array_dir.join("chunks");
279 fs::create_dir_all_async(&chunk_dir).await.map_err(|err| {
280 data_error(format!(
281 "failed to create chunk dir '{}': {err}",
282 chunk_dir.display()
283 ))
284 })?;
285
286 let mut index = DataChunkIndex {
287 schema_version: 1,
288 array: array.to_string(),
289 chunks: Vec::new(),
290 };
291 let mut upload_chunks = Vec::new();
292 let grid_shape = chunk_grid_shape(&payload.shape, chunk_shape);
293 let mut coords = vec![0usize; payload.shape.len()];
294 loop {
295 let chunk_start = chunk_start_for_coords(&coords, chunk_shape);
296 let chunk_extent = chunk_extent_for_start(&chunk_start, chunk_shape, &payload.shape);
297 let chunk_payload = DataArrayPayload {
298 dtype: payload.dtype.clone(),
299 shape: chunk_extent.clone(),
300 values: collect_chunk_values(payload, &chunk_start, &chunk_extent)?,
301 };
302 let key = chunk_key(&coords);
303 let object_id = format!("obj_{}", key.replace('.', "_"));
304 let chunk_bytes = serde_json::to_vec(&chunk_payload)
305 .map_err(|err| data_error(format!("failed to encode chunk payload: {err}")))?;
306 let data_path = chunk_dir.join(format!("{object_id}.json"));
307 fs::write_async(&data_path, &chunk_bytes)
308 .await
309 .map_err(|err| {
310 data_error(format!(
311 "failed to write chunk '{}': {err}",
312 data_path.display()
313 ))
314 })?;
315 let hash = sha256_hex(&chunk_bytes);
316 let rel_chunk_path = data_path
317 .strip_prefix(root)
318 .map_err(|err| data_error(format!("failed to compute chunk relative path: {err}")))?
319 .to_string_lossy()
320 .to_string();
321 index.chunks.push(DataChunkIndexEntry {
322 key: key.clone(),
323 object_id: object_id.clone(),
324 hash: hash.clone(),
325 bytes_raw: chunk_bytes.len() as u64,
326 bytes_stored: chunk_bytes.len() as u64,
327 coords: coords.clone(),
328 shape: chunk_extent,
329 data_path: rel_chunk_path,
330 });
331 upload_chunks.push((
332 DataChunkDescriptor {
333 key,
334 object_id,
335 hash,
336 bytes_raw: chunk_bytes.len() as u64,
337 bytes_stored: chunk_bytes.len() as u64,
338 },
339 chunk_bytes,
340 ));
341 if !advance_index(&mut coords, &grid_shape) {
342 break;
343 }
344 }
345
346 maybe_upload_chunks_async(root, array, upload_chunks).await?;
347
348 tracing::info!(
349 target: "runmat.data",
350 dataset = %root.display(),
351 array = array,
352 chunks = index.chunks.len(),
353 payload_bytes = bytes.len(),
354 "data chunk write planned"
355 );
356
357 let chunk_index_path = chunk_dir.join("index.json");
358 let chunk_index_bytes = serde_json::to_vec(&index)
359 .map_err(|err| data_error(format!("failed to encode chunk index json: {err}")))?;
360 fs::write_async(&chunk_index_path, &chunk_index_bytes)
361 .await
362 .map_err(|err| {
363 data_error(format!(
364 "failed to write chunk index '{}': {err}",
365 chunk_index_path.display()
366 ))
367 })?;
368 Ok((payload_path, chunk_index_path))
369}
370
371pub async fn read_array_payload_async(
372 root: &Path,
373 meta: &DataArrayMeta,
374) -> BuiltinResult<DataArrayPayload> {
375 if let Some(index_path) = &meta.chunk_index_path {
376 let path = root.join(index_path);
377 if fs::metadata_async(&path).await.is_ok() {
378 return read_array_payload_chunked_async(root, meta, &path).await;
379 }
380 }
381 let payload_path = root.join(&meta.data_path);
382 let bytes = fs::read_async(&payload_path).await.map_err(|err| {
383 data_error(format!(
384 "failed to read payload '{}': {err}",
385 payload_path.display()
386 ))
387 })?;
388 serde_json::from_slice::<DataArrayPayload>(&bytes).map_err(|err| {
389 data_error(format!(
390 "failed to parse payload '{}': {err}",
391 payload_path.display()
392 ))
393 })
394}
395
396pub async fn read_array_slice_payload_async(
397 root: &Path,
398 meta: &DataArrayMeta,
399 start: &[usize],
400 shape: &[usize],
401) -> BuiltinResult<DataArrayPayload> {
402 let (slice_start, slice_shape) = normalize_slice_bounds(&meta.shape, start, shape)?;
403 if let Some(index_path) = &meta.chunk_index_path {
404 let path = root.join(index_path);
405 if fs::metadata_async(&path).await.is_ok() {
406 return read_array_payload_chunked_slice_async(
407 root,
408 meta,
409 &path,
410 &slice_start,
411 &slice_shape,
412 )
413 .await;
414 }
415 }
416 let full = read_array_payload_async(root, meta).await?;
417 extract_slice_payload(&full, &slice_start, &slice_shape)
418}
419
420async fn read_array_payload_chunked_slice_async(
421 root: &Path,
422 meta: &DataArrayMeta,
423 index_path: &Path,
424 slice_start: &[usize],
425 slice_shape: &[usize],
426) -> BuiltinResult<DataArrayPayload> {
427 let bytes = fs::read_async(index_path).await.map_err(|err| {
428 data_error(format!(
429 "failed to read chunk index '{}': {err}",
430 index_path.display()
431 ))
432 })?;
433 let index: DataChunkIndex = serde_json::from_slice(&bytes).map_err(|err| {
434 data_error(format!(
435 "failed to parse chunk index '{}': {err}",
436 index_path.display()
437 ))
438 })?;
439
440 let mut values = vec![0.0; slice_shape.iter().copied().product::<usize>()];
441 for chunk in index.chunks {
442 let coords = chunk_coords_from_entry(&chunk, meta.shape.len())?;
443 let chunk_start = chunk_start_for_coords(&coords, &meta.chunk_shape);
444 let chunk_extent = if chunk.shape.is_empty() {
445 chunk_extent_for_start(&chunk_start, &meta.chunk_shape, &meta.shape)
446 } else {
447 chunk.shape.clone()
448 };
449 if !chunk_intersects_slice(&chunk_start, &chunk_extent, slice_start, slice_shape) {
450 continue;
451 }
452
453 let chunk_path = root.join(&chunk.data_path);
454 let bytes = fs::read_async(&chunk_path).await.map_err(|err| {
455 data_error(format!(
456 "failed to read chunk payload '{}': {err}",
457 chunk_path.display()
458 ))
459 })?;
460 let payload: DataArrayPayload = serde_json::from_slice(&bytes).map_err(|err| {
461 data_error(format!(
462 "failed to parse chunk payload '{}': {err}",
463 chunk_path.display()
464 ))
465 })?;
466 if payload.shape != chunk_extent {
467 return Err(data_error(format!(
468 "chunk payload shape mismatch for key '{}': {:?} != {:?}",
469 chunk.key, payload.shape, chunk_extent
470 )));
471 }
472
473 let mut local = vec![0usize; chunk_extent.len()];
474 loop {
475 let mut global = Vec::with_capacity(chunk_extent.len());
476 for dim in 0..chunk_extent.len() {
477 global.push(chunk_start[dim] + local[dim]);
478 }
479 if coordinate_in_slice(&global, slice_start, slice_shape) {
480 let src_linear = linear_index_column_major(&local, &chunk_extent)?;
481 let mut dst = Vec::with_capacity(slice_shape.len());
482 for dim in 0..slice_shape.len() {
483 dst.push(global[dim].saturating_sub(slice_start[dim]));
484 }
485 let dst_linear = linear_index_column_major(&dst, slice_shape)?;
486 values[dst_linear] = payload.values[src_linear];
487 }
488 if !advance_index(&mut local, &chunk_extent) {
489 break;
490 }
491 }
492 }
493
494 Ok(DataArrayPayload {
495 dtype: meta.dtype.clone(),
496 shape: slice_shape.to_vec(),
497 values,
498 })
499}
500
501async fn read_array_payload_chunked_async(
502 root: &Path,
503 meta: &DataArrayMeta,
504 index_path: &Path,
505) -> BuiltinResult<DataArrayPayload> {
506 let bytes = fs::read_async(index_path).await.map_err(|err| {
507 data_error(format!(
508 "failed to read chunk index '{}': {err}",
509 index_path.display()
510 ))
511 })?;
512 let index: DataChunkIndex = serde_json::from_slice(&bytes).map_err(|err| {
513 data_error(format!(
514 "failed to parse chunk index '{}': {err}",
515 index_path.display()
516 ))
517 })?;
518 let mut values = vec![0.0; meta.shape.iter().copied().product::<usize>()];
519 for chunk in index.chunks {
520 let chunk_path = root.join(&chunk.data_path);
521 let bytes = fs::read_async(&chunk_path).await.map_err(|err| {
522 data_error(format!(
523 "failed to read chunk payload '{}': {err}",
524 chunk_path.display()
525 ))
526 })?;
527 let payload: DataArrayPayload = serde_json::from_slice(&bytes).map_err(|err| {
528 data_error(format!(
529 "failed to parse chunk payload '{}': {err}",
530 chunk_path.display()
531 ))
532 })?;
533 let coords = chunk_coords_from_entry(&chunk, meta.shape.len())?;
534 let chunk_start = chunk_start_for_coords(&coords, &meta.chunk_shape);
535 let chunk_extent = if chunk.shape.is_empty() {
536 chunk_extent_for_start(&chunk_start, &meta.chunk_shape, &meta.shape)
537 } else {
538 chunk.shape.clone()
539 };
540 if payload.shape != chunk_extent {
541 return Err(data_error(format!(
542 "chunk payload shape mismatch for key '{}': {:?} != {:?}",
543 chunk.key, payload.shape, chunk_extent
544 )));
545 }
546 let mut local = vec![0usize; chunk_extent.len()];
547 loop {
548 let mut global = Vec::with_capacity(chunk_extent.len());
549 for dim in 0..chunk_extent.len() {
550 global.push(chunk_start[dim] + local[dim]);
551 }
552 let src_linear = linear_index_column_major(&local, &chunk_extent)?;
553 let dst_linear = linear_index_column_major(&global, &meta.shape)?;
554 values[dst_linear] = payload.values[src_linear];
555 if !advance_index(&mut local, &chunk_extent) {
556 break;
557 }
558 }
559 }
560 Ok(DataArrayPayload {
561 dtype: meta.dtype.clone(),
562 shape: meta.shape.clone(),
563 values,
564 })
565}
566
567async fn maybe_upload_chunks_async(
568 root: &Path,
569 array: &str,
570 chunks: Vec<(DataChunkDescriptor, Vec<u8>)>,
571) -> BuiltinResult<()> {
572 if chunks.is_empty() {
573 return Ok(());
574 }
575 let request = DataChunkUploadRequest {
576 dataset_path: root.to_string_lossy().to_string(),
577 array: array.to_string(),
578 chunks: chunks.iter().map(|(desc, _)| desc.clone()).collect(),
579 };
580 let targets = match fs::data_chunk_upload_targets_async(&request).await {
581 Ok(targets) => targets,
582 Err(err) if err.kind() == std::io::ErrorKind::Unsupported => return Ok(()),
583 Err(err) => {
584 return Err(data_error(format!(
585 "failed to request data chunk upload targets: {err}"
586 )))
587 }
588 };
589 for (descriptor, bytes) in chunks {
590 let target = find_chunk_target(&targets, &descriptor.key)?;
591 fs::data_upload_chunk_async(target, &bytes)
592 .await
593 .map_err(|err| {
594 data_error(format!(
595 "failed to upload chunk '{}': {err}",
596 descriptor.key
597 ))
598 })?;
599 tracing::info!(
600 target: "runmat.data",
601 dataset = %root.display(),
602 array = array,
603 chunk_key = descriptor.key,
604 bytes = bytes.len(),
605 "data chunk uploaded"
606 );
607 }
608 Ok(())
609}
610
611fn find_chunk_target<'a>(
612 targets: &'a [DataChunkUploadTarget],
613 key: &str,
614) -> BuiltinResult<&'a DataChunkUploadTarget> {
615 targets
616 .iter()
617 .find(|target| target.key == key)
618 .ok_or_else(|| data_error(format!("missing upload target for chunk '{key}'")))
619}
620
621pub fn sha256_hex(bytes: &[u8]) -> String {
622 let mut hasher = Sha256::new();
623 hasher.update(bytes);
624 let digest = hasher.finalize();
625 format!("sha256:{:x}", digest)
626}
627
628fn chunk_key(coords: &[usize]) -> String {
629 coords
630 .iter()
631 .map(|v| v.to_string())
632 .collect::<Vec<_>>()
633 .join(".")
634}
635
636fn chunk_grid_shape(shape: &[usize], chunk_shape: &[usize]) -> Vec<usize> {
637 shape
638 .iter()
639 .enumerate()
640 .map(|(idx, extent)| {
641 let chunk = chunk_shape.get(idx).copied().unwrap_or(1).max(1);
642 extent.div_ceil(chunk)
643 })
644 .collect()
645}
646
647fn chunk_start_for_coords(coords: &[usize], chunk_shape: &[usize]) -> Vec<usize> {
648 coords
649 .iter()
650 .enumerate()
651 .map(|(idx, coord)| coord * chunk_shape.get(idx).copied().unwrap_or(1).max(1))
652 .collect()
653}
654
655fn chunk_extent_for_start(
656 start: &[usize],
657 chunk_shape: &[usize],
658 full_shape: &[usize],
659) -> Vec<usize> {
660 start
661 .iter()
662 .enumerate()
663 .map(|(idx, start)| {
664 let chunk = chunk_shape.get(idx).copied().unwrap_or(1).max(1);
665 let end = (*start + chunk).min(full_shape[idx]);
666 end.saturating_sub(*start)
667 })
668 .collect()
669}
670
671fn collect_chunk_values(
672 payload: &DataArrayPayload,
673 chunk_start: &[usize],
674 chunk_extent: &[usize],
675) -> BuiltinResult<Vec<f64>> {
676 let mut local = vec![0usize; chunk_extent.len()];
677 let mut values = Vec::with_capacity(chunk_extent.iter().copied().product());
678 loop {
679 let mut global = Vec::with_capacity(chunk_extent.len());
680 for dim in 0..chunk_extent.len() {
681 global.push(chunk_start[dim] + local[dim]);
682 }
683 let linear = linear_index_column_major(&global, &payload.shape)?;
684 values.push(payload.values[linear]);
685 if !advance_index(&mut local, chunk_extent) {
686 break;
687 }
688 }
689 Ok(values)
690}
691
692fn chunk_coords_from_entry(entry: &DataChunkIndexEntry, rank: usize) -> BuiltinResult<Vec<usize>> {
693 if !entry.coords.is_empty() {
694 if entry.coords.len() != rank {
695 return Err(data_error(format!(
696 "chunk coords rank mismatch for key '{}': expected {rank}, got {}",
697 entry.key,
698 entry.coords.len()
699 )));
700 }
701 return Ok(entry.coords.clone());
702 }
703 let coords = entry
704 .key
705 .split('.')
706 .map(|part| {
707 part.parse::<usize>()
708 .map_err(|_| data_error(format!("invalid chunk key '{}'", entry.key)))
709 })
710 .collect::<BuiltinResult<Vec<_>>>()?;
711 if coords.len() != rank {
712 return Err(data_error(format!(
713 "chunk key rank mismatch for key '{}': expected {rank}, got {}",
714 entry.key,
715 coords.len()
716 )));
717 }
718 Ok(coords)
719}
720
721fn normalize_slice_bounds(
722 full_shape: &[usize],
723 start: &[usize],
724 shape: &[usize],
725) -> BuiltinResult<(Vec<usize>, Vec<usize>)> {
726 if full_shape.is_empty() {
727 return Ok((Vec::new(), Vec::new()));
728 }
729 let mut normalized_start = Vec::with_capacity(full_shape.len());
730 let mut normalized_shape = Vec::with_capacity(full_shape.len());
731 for (axis, axis_len) in full_shape.iter().copied().enumerate() {
732 if axis_len == 0 {
733 return Err(data_error("slice axis length must be greater than zero"));
734 }
735 let requested_start = start.get(axis).copied().unwrap_or(0);
736 let clamped_start = requested_start.min(axis_len.saturating_sub(1));
737 let requested_span = shape.get(axis).copied().unwrap_or(axis_len);
738 let clamped_span = requested_span
739 .max(1)
740 .min(axis_len.saturating_sub(clamped_start));
741 normalized_start.push(clamped_start);
742 normalized_shape.push(clamped_span);
743 }
744 Ok((normalized_start, normalized_shape))
745}
746
747fn coordinate_in_slice(global: &[usize], slice_start: &[usize], slice_shape: &[usize]) -> bool {
748 for dim in 0..slice_shape.len() {
749 let start = slice_start[dim];
750 let end = start.saturating_add(slice_shape[dim]);
751 let value = global[dim];
752 if value < start || value >= end {
753 return false;
754 }
755 }
756 true
757}
758
759fn chunk_intersects_slice(
760 chunk_start: &[usize],
761 chunk_extent: &[usize],
762 slice_start: &[usize],
763 slice_shape: &[usize],
764) -> bool {
765 for dim in 0..slice_shape.len() {
766 let chunk_lo = chunk_start[dim];
767 let chunk_hi = chunk_lo.saturating_add(chunk_extent[dim]);
768 let slice_lo = slice_start[dim];
769 let slice_hi = slice_lo.saturating_add(slice_shape[dim]);
770 if chunk_hi <= slice_lo || slice_hi <= chunk_lo {
771 return false;
772 }
773 }
774 true
775}
776
777fn extract_slice_payload(
778 payload: &DataArrayPayload,
779 start: &[usize],
780 shape: &[usize],
781) -> BuiltinResult<DataArrayPayload> {
782 let mut values = Vec::with_capacity(shape.iter().copied().product());
783 if shape.is_empty() {
784 return Ok(DataArrayPayload {
785 dtype: payload.dtype.clone(),
786 shape: Vec::new(),
787 values,
788 });
789 }
790 let mut local = vec![0usize; shape.len()];
791 loop {
792 let mut global = Vec::with_capacity(shape.len());
793 for dim in 0..shape.len() {
794 global.push(start[dim] + local[dim]);
795 }
796 let linear = linear_index_column_major(&global, &payload.shape)?;
797 values.push(payload.values[linear]);
798 if !advance_index(&mut local, shape) {
799 break;
800 }
801 }
802 Ok(DataArrayPayload {
803 dtype: payload.dtype.clone(),
804 shape: shape.to_vec(),
805 values,
806 })
807}
808
809fn linear_index_column_major(index: &[usize], shape: &[usize]) -> BuiltinResult<usize> {
810 if index.len() != shape.len() {
811 return Err(data_error("chunk index rank mismatch"));
812 }
813 let mut stride = 1usize;
814 let mut linear = 0usize;
815 for (idx, extent) in index.iter().zip(shape.iter()) {
816 if *idx >= *extent {
817 return Err(data_error("chunk index out of bounds"));
818 }
819 linear += idx * stride;
820 stride = stride.saturating_mul(*extent);
821 }
822 Ok(linear)
823}
824
825fn advance_index(index: &mut [usize], shape: &[usize]) -> bool {
826 if shape.is_empty() {
827 return false;
828 }
829 for dim in 0..shape.len() {
830 index[dim] += 1;
831 if index[dim] < shape[dim] {
832 return true;
833 }
834 index[dim] = 0;
835 }
836 false
837}
838
839pub fn parse_schema(schema: &Value) -> BuiltinResult<DataSchema> {
840 let Value::Struct(schema_struct) = schema else {
841 return Err(data_error("data.create: schema must be a struct"));
842 };
843 let arrays_value = schema_struct
844 .fields
845 .get("arrays")
846 .ok_or_else(|| data_error("data.create: schema missing 'arrays' field"))?;
847 let Value::Struct(arrays_struct) = arrays_value else {
848 return Err(data_error("data.create: schema.arrays must be a struct"));
849 };
850
851 let mut arrays = BTreeMap::new();
852 for (name, meta_value) in &arrays_struct.fields {
853 let Value::Struct(meta_struct) = meta_value else {
854 return Err(data_error(format!(
855 "data.create: schema.arrays.{name} must be a struct"
856 )));
857 };
858 let dtype = meta_struct
859 .fields
860 .get("dtype")
861 .map(|v| parse_string(v, "data.create schema dtype"))
862 .transpose()?
863 .unwrap_or_else(|| "f64".to_string());
864 let shape = meta_struct
865 .fields
866 .get("shape")
867 .map(parse_usize_vector)
868 .transpose()?
869 .unwrap_or_else(|| vec![0, 0]);
870 let chunk_shape = meta_struct
871 .fields
872 .get("chunk")
873 .map(parse_usize_vector)
874 .transpose()?
875 .unwrap_or_else(|| default_chunk_shape(&shape));
876 let codec = meta_struct
877 .fields
878 .get("codec")
879 .map(|v| parse_string(v, "data.create schema codec"))
880 .transpose()?
881 .unwrap_or_else(|| "zstd".to_string());
882 let data_path = format!("arrays/{name}/data.f64.json");
883 let chunk_index_path = format!("arrays/{name}/chunks/index.json");
884 arrays.insert(
885 name.clone(),
886 DataArrayMeta {
887 dtype,
888 shape,
889 chunk_shape,
890 order: default_array_order(),
891 codec,
892 chunk_index_path: Some(chunk_index_path),
893 data_path,
894 },
895 );
896 }
897
898 Ok(DataSchema { arrays })
899}
900
901fn default_chunk_shape(shape: &[usize]) -> Vec<usize> {
902 if shape.is_empty() {
903 return vec![1024];
904 }
905 let mut out = shape.to_vec();
906 if out.len() == 1 {
907 out[0] = out[0].clamp(1, 65_536);
908 return out;
909 }
910 out[0] = out[0].clamp(1, 256);
911 out[1] = out[1].clamp(1, 256);
912 for dim in out.iter_mut().skip(2) {
913 *dim = (*dim).clamp(1, 8);
914 }
915 out
916}
917
918fn parse_usize_vector(value: &Value) -> BuiltinResult<Vec<usize>> {
919 match value {
920 Value::Tensor(t) => tensor_to_usize_vector(t),
921 Value::Num(n) => {
922 if *n < 0.0 || !n.is_finite() {
923 return Err(data_error(
924 "data schema dimensions must be non-negative finite numbers",
925 ));
926 }
927 Ok(vec![*n as usize])
928 }
929 Value::Int(i) => {
930 let n = i.to_i64();
931 if n < 0 {
932 return Err(data_error("data schema dimensions must be non-negative"));
933 }
934 Ok(vec![n as usize])
935 }
936 _ => Err(data_error(
937 "data schema dimension field must be numeric tensor/vector",
938 )),
939 }
940}
941
942fn tensor_to_usize_vector(t: &Tensor) -> BuiltinResult<Vec<usize>> {
943 let mut out = Vec::with_capacity(t.data.len());
944 for value in &t.data {
945 if !value.is_finite() || *value < 0.0 {
946 return Err(data_error(
947 "data schema dimensions must be non-negative finite numbers",
948 ));
949 }
950 out.push(*value as usize);
951 }
952 Ok(out)
953}
954
955pub fn dataset_object(path: &str, manifest: &DataManifest) -> Value {
956 let mut obj = ObjectInstance::new("Dataset".to_string());
957 obj.properties
958 .insert("__data_path".to_string(), Value::String(path.to_string()));
959 obj.properties.insert(
960 "__data_id".to_string(),
961 Value::String(manifest.dataset_id.clone()),
962 );
963 obj.properties.insert(
964 "__data_version".to_string(),
965 Value::String(manifest_version_token(manifest)),
966 );
967 Value::Object(obj)
968}
969
970pub fn manifest_version_token(manifest: &DataManifest) -> String {
971 format!("{}:{}", manifest.updated_at, manifest.txn_sequence)
972}
973
974pub fn ensure_manifest_sequence(expected: u64, manifest: &DataManifest) -> BuiltinResult<()> {
975 if manifest.txn_sequence != expected {
976 tracing::warn!(
977 target: "runmat.data",
978 expected_sequence = expected,
979 actual_sequence = manifest.txn_sequence,
980 "manifest conflict detected"
981 );
982 return Err(data_error_with_identifier(
983 "MANIFEST_CONFLICT: dataset changed since transaction begin",
984 DATA_MANIFEST_CONFLICT_IDENTIFIER,
985 ));
986 }
987 Ok(())
988}
989
990pub fn array_object(dataset_path: &str, array_name: &str) -> Value {
991 let mut obj = ObjectInstance::new("DataArray".to_string());
992 obj.properties.insert(
993 "__data_path".to_string(),
994 Value::String(dataset_path.to_string()),
995 );
996 obj.properties.insert(
997 "__array_name".to_string(),
998 Value::String(array_name.to_string()),
999 );
1000 Value::Object(obj)
1001}
1002
1003pub fn transaction_object(dataset_path: &str, tx_id: &str) -> Value {
1004 let mut obj = ObjectInstance::new("DataTransaction".to_string());
1005 obj.properties.insert(
1006 "__data_path".to_string(),
1007 Value::String(dataset_path.to_string()),
1008 );
1009 obj.properties
1010 .insert("__tx_id".to_string(), Value::String(tx_id.to_string()));
1011 Value::Object(obj)
1012}
1013
1014pub fn get_object_prop<'a>(obj: &'a ObjectInstance, key: &str) -> BuiltinResult<&'a Value> {
1015 obj.properties
1016 .get(key)
1017 .ok_or_else(|| data_error(format!("object missing internal property '{key}'")))
1018}
1019
1020pub fn now_rfc3339() -> String {
1021 Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true)
1022}
1023
1024pub fn new_dataset_id() -> String {
1025 static NEXT_DATASET_ID: AtomicU64 = AtomicU64::new(1);
1026 let seq = NEXT_DATASET_ID.fetch_add(1, Ordering::Relaxed);
1027 format!("ds_{}_{}", Utc::now().timestamp_millis(), seq)
1028}
1029
1030pub fn new_tx_id() -> String {
1031 static NEXT_TX_ID: AtomicU64 = AtomicU64::new(1);
1032 let seq = NEXT_TX_ID.fetch_add(1, Ordering::Relaxed);
1033 format!("tx_{}_{}", Utc::now().timestamp_millis(), seq)
1034}
1035
1036pub fn start_tx(dataset_path: String, base_sequence: u64) -> BuiltinResult<String> {
1037 let tx_id = new_tx_id();
1038 let pending = PendingTxn {
1039 dataset_path,
1040 base_sequence,
1041 writes: Vec::new(),
1042 resizes: Vec::new(),
1043 fills: Vec::new(),
1044 create_arrays: Vec::new(),
1045 delete_arrays: Vec::new(),
1046 attrs: BTreeMap::new(),
1047 status: TxnStatus::Open,
1048 };
1049 with_tx_registry(|registry| {
1050 registry.insert(tx_id.clone(), pending);
1051 })?;
1052 Ok(tx_id)
1053}
1054
1055pub fn with_tx_mut<T>(
1056 tx_id: &str,
1057 f: impl FnOnce(&mut PendingTxn) -> BuiltinResult<T>,
1058) -> BuiltinResult<T> {
1059 with_tx_registry(|registry| {
1060 let tx = registry.get_mut(tx_id).ok_or_else(|| {
1061 data_error_with_identifier(
1062 format!("transaction '{tx_id}' not found"),
1063 DATA_TRANSACTION_NOT_FOUND_IDENTIFIER,
1064 )
1065 })?;
1066 f(tx)
1067 })?
1068}
1069
1070pub fn with_tx<T>(
1071 tx_id: &str,
1072 f: impl FnOnce(&PendingTxn) -> BuiltinResult<T>,
1073) -> BuiltinResult<T> {
1074 #[cfg(not(target_arch = "wasm32"))]
1075 {
1076 if TASK_TX_REGISTRY.try_with(|_| ()).is_ok() {
1077 return TASK_TX_REGISTRY.with(|registry| {
1078 let registry = registry
1079 .try_borrow()
1080 .map_err(|_| data_error("data transaction registry is already borrowed"))?;
1081 let tx = registry.get(tx_id).ok_or_else(|| {
1082 data_error_with_identifier(
1083 format!("transaction '{tx_id}' not found"),
1084 DATA_TRANSACTION_NOT_FOUND_IDENTIFIER,
1085 )
1086 })?;
1087 f(tx)
1088 });
1089 }
1090 }
1091
1092 FALLBACK_TX_REGISTRY.with(|registry| {
1093 let registry = registry
1094 .try_borrow()
1095 .map_err(|_| data_error("data transaction registry is already borrowed"))?;
1096 let tx = registry.get(tx_id).ok_or_else(|| {
1097 data_error_with_identifier(
1098 format!("transaction '{tx_id}' not found"),
1099 DATA_TRANSACTION_NOT_FOUND_IDENTIFIER,
1100 )
1101 })?;
1102 f(tx)
1103 })
1104}
1105
1106pub fn remove_tx(tx_id: &str) -> BuiltinResult<()> {
1107 with_tx_registry(|registry| {
1108 let _ = registry.remove(tx_id);
1109 })
1110}
1111
1112#[cfg(test)]
1113mod tests {
1114 use super::*;
1115
1116 #[test]
1117 fn ensure_manifest_sequence_accepts_matching_sequence() {
1118 let manifest = DataManifest {
1119 schema_version: 1,
1120 format: "runmat-data".to_string(),
1121 dataset_id: "ds_test".to_string(),
1122 name: Some("test".to_string()),
1123 created_at: "2026-03-01T00:00:00Z".to_string(),
1124 updated_at: "2026-03-01T00:00:00Z".to_string(),
1125 arrays: BTreeMap::new(),
1126 attrs: BTreeMap::new(),
1127 txn_sequence: 5,
1128 };
1129 ensure_manifest_sequence(5, &manifest).expect("expected sequence match");
1130 }
1131
1132 #[test]
1133 fn ensure_manifest_sequence_rejects_conflict() {
1134 let manifest = DataManifest {
1135 schema_version: 1,
1136 format: "runmat-data".to_string(),
1137 dataset_id: "ds_test".to_string(),
1138 name: Some("test".to_string()),
1139 created_at: "2026-03-01T00:00:00Z".to_string(),
1140 updated_at: "2026-03-01T00:00:00Z".to_string(),
1141 arrays: BTreeMap::new(),
1142 attrs: BTreeMap::new(),
1143 txn_sequence: 6,
1144 };
1145 let err = ensure_manifest_sequence(5, &manifest).expect_err("expected conflict error");
1146 assert_eq!(
1147 err.identifier(),
1148 Some(DATA_MANIFEST_CONFLICT_IDENTIFIER),
1149 "manifest conflicts should expose a stable identifier"
1150 );
1151 }
1152
1153 #[test]
1154 fn transaction_registry_roundtrip() {
1155 let tx_id = start_tx("/datasets/test.data".to_string(), 7).expect("start tx");
1156 let status = with_tx(&tx_id, |tx| Ok(tx.status.clone())).expect("tx lookup");
1157 assert_eq!(status, TxnStatus::Open);
1158 remove_tx(&tx_id).expect("remove tx");
1159 let err = with_tx(&tx_id, |_| Ok(())).expect_err("expected missing tx");
1160 assert_eq!(
1161 err.identifier(),
1162 Some(DATA_TRANSACTION_NOT_FOUND_IDENTIFIER),
1163 "missing transaction lookups should expose a stable identifier"
1164 );
1165 }
1166
1167 #[cfg(not(target_arch = "wasm32"))]
1168 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1169 async fn transaction_registry_scope_survives_await() {
1170 with_tx_registry_scope(async {
1171 let tx_id = start_tx("/datasets/task-local.data".to_string(), 11).expect("start tx");
1172 tokio::task::yield_now().await;
1173 let status = with_tx(&tx_id, |tx| Ok(tx.status.clone())).expect("tx lookup");
1174 assert_eq!(status, TxnStatus::Open);
1175 remove_tx(&tx_id).expect("remove tx");
1176 let err = with_tx(&tx_id, |_| Ok(())).expect_err("expected missing tx");
1177 assert_eq!(
1178 err.identifier(),
1179 Some(DATA_TRANSACTION_NOT_FOUND_IDENTIFIER)
1180 );
1181 })
1182 .await;
1183 }
1184
1185 #[test]
1186 fn sha256_hash_format_matches_expected_prefix() {
1187 let hash = sha256_hex(b"runmat");
1188 assert!(hash.starts_with("sha256:"));
1189 assert_eq!(hash.len(), "sha256:".len() + 64);
1190 }
1191}