Skip to main content

runmat_runtime/data/
mod.rs

1use std::collections::{BTreeMap, HashMap};
2use std::path::{Path, PathBuf};
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Mutex, OnceLock};
5
6use chrono::Utc;
7use runmat_builtins::{ObjectInstance, Tensor, Value};
8use runmat_filesystem as fs;
9use runmat_filesystem::data_contract::{
10    DataChunkDescriptor, DataChunkUploadRequest, DataChunkUploadTarget,
11};
12use serde::{Deserialize, Serialize};
13use sha2::{Digest, Sha256};
14
15use crate::{build_runtime_error, BuiltinResult, RuntimeError};
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct DataManifest {
19    pub schema_version: u32,
20    pub format: String,
21    pub dataset_id: String,
22    pub name: Option<String>,
23    pub created_at: String,
24    pub updated_at: String,
25    pub arrays: BTreeMap<String, DataArrayMeta>,
26    pub attrs: BTreeMap<String, serde_json::Value>,
27    pub txn_sequence: u64,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct DataArrayMeta {
32    pub dtype: String,
33    pub shape: Vec<usize>,
34    pub chunk_shape: Vec<usize>,
35    #[serde(default = "default_array_order")]
36    pub order: String,
37    pub codec: String,
38    #[serde(default)]
39    pub chunk_index_path: Option<String>,
40    pub data_path: String,
41}
42
43fn default_array_order() -> String {
44    "column_major".to_string()
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct DataArrayPayload {
49    pub dtype: String,
50    pub shape: Vec<usize>,
51    pub values: Vec<f64>,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct DataChunkIndex {
56    pub schema_version: u32,
57    pub array: String,
58    pub chunks: Vec<DataChunkIndexEntry>,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct DataChunkIndexEntry {
63    pub key: String,
64    pub object_id: String,
65    pub hash: String,
66    pub bytes_raw: u64,
67    pub bytes_stored: u64,
68    #[serde(default)]
69    pub coords: Vec<usize>,
70    #[serde(default)]
71    pub shape: Vec<usize>,
72    pub data_path: String,
73}
74
75#[derive(Debug, Clone)]
76pub struct DataSchema {
77    pub arrays: BTreeMap<String, DataArrayMeta>,
78}
79
80#[derive(Debug, Clone)]
81pub struct PendingTxn {
82    pub dataset_path: String,
83    pub base_sequence: u64,
84    pub writes: Vec<PendingWrite>,
85    pub resizes: Vec<PendingResize>,
86    pub fills: Vec<PendingFill>,
87    pub create_arrays: Vec<PendingCreateArray>,
88    pub delete_arrays: Vec<String>,
89    pub attrs: BTreeMap<String, Value>,
90    pub status: TxnStatus,
91}
92
93#[derive(Debug, Clone)]
94pub struct PendingWrite {
95    pub array: String,
96    pub slice_spec: Option<Value>,
97    pub value: Value,
98}
99
100#[derive(Debug, Clone)]
101pub struct PendingResize {
102    pub array: String,
103    pub shape: Vec<usize>,
104}
105
106#[derive(Debug, Clone)]
107pub struct PendingFill {
108    pub array: String,
109    pub slice_spec: Option<Value>,
110    pub value: Value,
111}
112
113#[derive(Debug, Clone)]
114pub struct PendingCreateArray {
115    pub array: String,
116    pub meta: DataArrayMeta,
117}
118
119#[derive(Debug, Clone, PartialEq, Eq)]
120pub enum TxnStatus {
121    Open,
122    Committed,
123    Aborted,
124}
125
126fn tx_registry() -> &'static Mutex<HashMap<String, PendingTxn>> {
127    static REG: OnceLock<Mutex<HashMap<String, PendingTxn>>> = OnceLock::new();
128    REG.get_or_init(|| Mutex::new(HashMap::new()))
129}
130
131pub fn data_error(message: impl Into<String>) -> RuntimeError {
132    build_runtime_error(message)
133        .with_identifier("RUNMAT:Data:Error")
134        .with_builtin("data")
135        .build()
136}
137
138fn data_error_with_identifier(
139    message: impl Into<String>,
140    identifier: &'static str,
141) -> RuntimeError {
142    build_runtime_error(message)
143        .with_identifier(identifier)
144        .with_builtin("data")
145        .build()
146}
147
148const DATA_MANIFEST_CONFLICT_IDENTIFIER: &str = "RunMat:data:ManifestConflict";
149const DATA_TRANSACTION_NOT_FOUND_IDENTIFIER: &str = "RunMat:data:TransactionNotFound";
150
151pub fn parse_string(value: &Value, context: &str) -> BuiltinResult<String> {
152    match value {
153        Value::String(s) => Ok(s.clone()),
154        Value::CharArray(ca) => Ok(ca.to_string()),
155        _ => Err(data_error(format!("{context}: expected string value"))),
156    }
157}
158
159pub fn dataset_root(path: &str) -> PathBuf {
160    PathBuf::from(path)
161}
162
163pub fn manifest_path(root: &Path) -> PathBuf {
164    root.join("manifest.json")
165}
166
167pub fn arrays_root(root: &Path) -> PathBuf {
168    root.join("arrays")
169}
170
171pub async fn write_manifest_async(root: &Path, manifest: &DataManifest) -> BuiltinResult<()> {
172    fs::create_dir_all_async(root).await.map_err(|err| {
173        data_error(format!(
174            "failed to create dataset root '{}': {err}",
175            root.display()
176        ))
177    })?;
178    let path = manifest_path(root);
179    let bytes = serde_json::to_vec_pretty(manifest)
180        .map_err(|err| data_error(format!("failed to encode manifest json: {err}")))?;
181    fs::write_async(&path, &bytes).await.map_err(|err| {
182        data_error(format!(
183            "failed to write manifest '{}': {err}",
184            path.display()
185        ))
186    })?;
187    Ok(())
188}
189
190pub async fn read_manifest_async(root: &Path) -> BuiltinResult<DataManifest> {
191    let path = manifest_path(root);
192    let bytes = fs::read_async(&path).await.map_err(|err| {
193        data_error(format!(
194            "failed to read manifest '{}': {err}",
195            path.display()
196        ))
197    })?;
198    let manifest = serde_json::from_slice::<DataManifest>(&bytes).map_err(|err| {
199        data_error(format!(
200            "failed to parse manifest '{}': {err}",
201            path.display()
202        ))
203    })?;
204    Ok(manifest)
205}
206
207pub async fn write_array_payload_async(
208    root: &Path,
209    array: &str,
210    payload: &DataArrayPayload,
211    chunk_shape: &[usize],
212) -> BuiltinResult<(PathBuf, PathBuf)> {
213    let array_dir = arrays_root(root).join(array);
214    fs::create_dir_all_async(&array_dir).await.map_err(|err| {
215        data_error(format!(
216            "failed to create array dir '{}': {err}",
217            array_dir.display()
218        ))
219    })?;
220    let payload_path = array_dir.join("data.f64.json");
221    let bytes = serde_json::to_vec(payload)
222        .map_err(|err| data_error(format!("failed to encode array payload json: {err}")))?;
223    fs::write_async(&payload_path, &bytes)
224        .await
225        .map_err(|err| {
226            data_error(format!(
227                "failed to write payload '{}': {err}",
228                payload_path.display()
229            ))
230        })?;
231
232    let chunk_dir = array_dir.join("chunks");
233    fs::create_dir_all_async(&chunk_dir).await.map_err(|err| {
234        data_error(format!(
235            "failed to create chunk dir '{}': {err}",
236            chunk_dir.display()
237        ))
238    })?;
239
240    let mut index = DataChunkIndex {
241        schema_version: 1,
242        array: array.to_string(),
243        chunks: Vec::new(),
244    };
245    let mut upload_chunks = Vec::new();
246    let grid_shape = chunk_grid_shape(&payload.shape, chunk_shape);
247    let mut coords = vec![0usize; payload.shape.len()];
248    loop {
249        let chunk_start = chunk_start_for_coords(&coords, chunk_shape);
250        let chunk_extent = chunk_extent_for_start(&chunk_start, chunk_shape, &payload.shape);
251        let chunk_payload = DataArrayPayload {
252            dtype: payload.dtype.clone(),
253            shape: chunk_extent.clone(),
254            values: collect_chunk_values(payload, &chunk_start, &chunk_extent)?,
255        };
256        let key = chunk_key(&coords);
257        let object_id = format!("obj_{}", key.replace('.', "_"));
258        let chunk_bytes = serde_json::to_vec(&chunk_payload)
259            .map_err(|err| data_error(format!("failed to encode chunk payload: {err}")))?;
260        let data_path = chunk_dir.join(format!("{object_id}.json"));
261        fs::write_async(&data_path, &chunk_bytes)
262            .await
263            .map_err(|err| {
264                data_error(format!(
265                    "failed to write chunk '{}': {err}",
266                    data_path.display()
267                ))
268            })?;
269        let hash = sha256_hex(&chunk_bytes);
270        let rel_chunk_path = data_path
271            .strip_prefix(root)
272            .map_err(|err| data_error(format!("failed to compute chunk relative path: {err}")))?
273            .to_string_lossy()
274            .to_string();
275        index.chunks.push(DataChunkIndexEntry {
276            key: key.clone(),
277            object_id: object_id.clone(),
278            hash: hash.clone(),
279            bytes_raw: chunk_bytes.len() as u64,
280            bytes_stored: chunk_bytes.len() as u64,
281            coords: coords.clone(),
282            shape: chunk_extent,
283            data_path: rel_chunk_path,
284        });
285        upload_chunks.push((
286            DataChunkDescriptor {
287                key,
288                object_id,
289                hash,
290                bytes_raw: chunk_bytes.len() as u64,
291                bytes_stored: chunk_bytes.len() as u64,
292            },
293            chunk_bytes,
294        ));
295        if !advance_index(&mut coords, &grid_shape) {
296            break;
297        }
298    }
299
300    maybe_upload_chunks_async(root, array, upload_chunks).await?;
301
302    tracing::info!(
303        target: "runmat.data",
304        dataset = %root.display(),
305        array = array,
306        chunks = index.chunks.len(),
307        payload_bytes = bytes.len(),
308        "data chunk write planned"
309    );
310
311    let chunk_index_path = chunk_dir.join("index.json");
312    let chunk_index_bytes = serde_json::to_vec(&index)
313        .map_err(|err| data_error(format!("failed to encode chunk index json: {err}")))?;
314    fs::write_async(&chunk_index_path, &chunk_index_bytes)
315        .await
316        .map_err(|err| {
317            data_error(format!(
318                "failed to write chunk index '{}': {err}",
319                chunk_index_path.display()
320            ))
321        })?;
322    Ok((payload_path, chunk_index_path))
323}
324
325pub async fn read_array_payload_async(
326    root: &Path,
327    meta: &DataArrayMeta,
328) -> BuiltinResult<DataArrayPayload> {
329    if let Some(index_path) = &meta.chunk_index_path {
330        let path = root.join(index_path);
331        if fs::metadata_async(&path).await.is_ok() {
332            return read_array_payload_chunked_async(root, meta, &path).await;
333        }
334    }
335    let payload_path = root.join(&meta.data_path);
336    let bytes = fs::read_async(&payload_path).await.map_err(|err| {
337        data_error(format!(
338            "failed to read payload '{}': {err}",
339            payload_path.display()
340        ))
341    })?;
342    serde_json::from_slice::<DataArrayPayload>(&bytes).map_err(|err| {
343        data_error(format!(
344            "failed to parse payload '{}': {err}",
345            payload_path.display()
346        ))
347    })
348}
349
350pub async fn read_array_slice_payload_async(
351    root: &Path,
352    meta: &DataArrayMeta,
353    start: &[usize],
354    shape: &[usize],
355) -> BuiltinResult<DataArrayPayload> {
356    let (slice_start, slice_shape) = normalize_slice_bounds(&meta.shape, start, shape)?;
357    if let Some(index_path) = &meta.chunk_index_path {
358        let path = root.join(index_path);
359        if fs::metadata_async(&path).await.is_ok() {
360            return read_array_payload_chunked_slice_async(
361                root,
362                meta,
363                &path,
364                &slice_start,
365                &slice_shape,
366            )
367            .await;
368        }
369    }
370    let full = read_array_payload_async(root, meta).await?;
371    extract_slice_payload(&full, &slice_start, &slice_shape)
372}
373
374async fn read_array_payload_chunked_slice_async(
375    root: &Path,
376    meta: &DataArrayMeta,
377    index_path: &Path,
378    slice_start: &[usize],
379    slice_shape: &[usize],
380) -> BuiltinResult<DataArrayPayload> {
381    let bytes = fs::read_async(index_path).await.map_err(|err| {
382        data_error(format!(
383            "failed to read chunk index '{}': {err}",
384            index_path.display()
385        ))
386    })?;
387    let index: DataChunkIndex = serde_json::from_slice(&bytes).map_err(|err| {
388        data_error(format!(
389            "failed to parse chunk index '{}': {err}",
390            index_path.display()
391        ))
392    })?;
393
394    let mut values = vec![0.0; slice_shape.iter().copied().product::<usize>()];
395    for chunk in index.chunks {
396        let coords = chunk_coords_from_entry(&chunk, meta.shape.len())?;
397        let chunk_start = chunk_start_for_coords(&coords, &meta.chunk_shape);
398        let chunk_extent = if chunk.shape.is_empty() {
399            chunk_extent_for_start(&chunk_start, &meta.chunk_shape, &meta.shape)
400        } else {
401            chunk.shape.clone()
402        };
403        if !chunk_intersects_slice(&chunk_start, &chunk_extent, slice_start, slice_shape) {
404            continue;
405        }
406
407        let chunk_path = root.join(&chunk.data_path);
408        let bytes = fs::read_async(&chunk_path).await.map_err(|err| {
409            data_error(format!(
410                "failed to read chunk payload '{}': {err}",
411                chunk_path.display()
412            ))
413        })?;
414        let payload: DataArrayPayload = serde_json::from_slice(&bytes).map_err(|err| {
415            data_error(format!(
416                "failed to parse chunk payload '{}': {err}",
417                chunk_path.display()
418            ))
419        })?;
420        if payload.shape != chunk_extent {
421            return Err(data_error(format!(
422                "chunk payload shape mismatch for key '{}': {:?} != {:?}",
423                chunk.key, payload.shape, chunk_extent
424            )));
425        }
426
427        let mut local = vec![0usize; chunk_extent.len()];
428        loop {
429            let mut global = Vec::with_capacity(chunk_extent.len());
430            for dim in 0..chunk_extent.len() {
431                global.push(chunk_start[dim] + local[dim]);
432            }
433            if coordinate_in_slice(&global, slice_start, slice_shape) {
434                let src_linear = linear_index_column_major(&local, &chunk_extent)?;
435                let mut dst = Vec::with_capacity(slice_shape.len());
436                for dim in 0..slice_shape.len() {
437                    dst.push(global[dim].saturating_sub(slice_start[dim]));
438                }
439                let dst_linear = linear_index_column_major(&dst, slice_shape)?;
440                values[dst_linear] = payload.values[src_linear];
441            }
442            if !advance_index(&mut local, &chunk_extent) {
443                break;
444            }
445        }
446    }
447
448    Ok(DataArrayPayload {
449        dtype: meta.dtype.clone(),
450        shape: slice_shape.to_vec(),
451        values,
452    })
453}
454
455async fn read_array_payload_chunked_async(
456    root: &Path,
457    meta: &DataArrayMeta,
458    index_path: &Path,
459) -> BuiltinResult<DataArrayPayload> {
460    let bytes = fs::read_async(index_path).await.map_err(|err| {
461        data_error(format!(
462            "failed to read chunk index '{}': {err}",
463            index_path.display()
464        ))
465    })?;
466    let index: DataChunkIndex = serde_json::from_slice(&bytes).map_err(|err| {
467        data_error(format!(
468            "failed to parse chunk index '{}': {err}",
469            index_path.display()
470        ))
471    })?;
472    let mut values = vec![0.0; meta.shape.iter().copied().product::<usize>()];
473    for chunk in index.chunks {
474        let chunk_path = root.join(&chunk.data_path);
475        let bytes = fs::read_async(&chunk_path).await.map_err(|err| {
476            data_error(format!(
477                "failed to read chunk payload '{}': {err}",
478                chunk_path.display()
479            ))
480        })?;
481        let payload: DataArrayPayload = serde_json::from_slice(&bytes).map_err(|err| {
482            data_error(format!(
483                "failed to parse chunk payload '{}': {err}",
484                chunk_path.display()
485            ))
486        })?;
487        let coords = chunk_coords_from_entry(&chunk, meta.shape.len())?;
488        let chunk_start = chunk_start_for_coords(&coords, &meta.chunk_shape);
489        let chunk_extent = if chunk.shape.is_empty() {
490            chunk_extent_for_start(&chunk_start, &meta.chunk_shape, &meta.shape)
491        } else {
492            chunk.shape.clone()
493        };
494        if payload.shape != chunk_extent {
495            return Err(data_error(format!(
496                "chunk payload shape mismatch for key '{}': {:?} != {:?}",
497                chunk.key, payload.shape, chunk_extent
498            )));
499        }
500        let mut local = vec![0usize; chunk_extent.len()];
501        loop {
502            let mut global = Vec::with_capacity(chunk_extent.len());
503            for dim in 0..chunk_extent.len() {
504                global.push(chunk_start[dim] + local[dim]);
505            }
506            let src_linear = linear_index_column_major(&local, &chunk_extent)?;
507            let dst_linear = linear_index_column_major(&global, &meta.shape)?;
508            values[dst_linear] = payload.values[src_linear];
509            if !advance_index(&mut local, &chunk_extent) {
510                break;
511            }
512        }
513    }
514    Ok(DataArrayPayload {
515        dtype: meta.dtype.clone(),
516        shape: meta.shape.clone(),
517        values,
518    })
519}
520
521async fn maybe_upload_chunks_async(
522    root: &Path,
523    array: &str,
524    chunks: Vec<(DataChunkDescriptor, Vec<u8>)>,
525) -> BuiltinResult<()> {
526    if chunks.is_empty() {
527        return Ok(());
528    }
529    let request = DataChunkUploadRequest {
530        dataset_path: root.to_string_lossy().to_string(),
531        array: array.to_string(),
532        chunks: chunks.iter().map(|(desc, _)| desc.clone()).collect(),
533    };
534    let targets = match fs::data_chunk_upload_targets_async(&request).await {
535        Ok(targets) => targets,
536        Err(err) if err.kind() == std::io::ErrorKind::Unsupported => return Ok(()),
537        Err(err) => {
538            return Err(data_error(format!(
539                "failed to request data chunk upload targets: {err}"
540            )))
541        }
542    };
543    for (descriptor, bytes) in chunks {
544        let target = find_chunk_target(&targets, &descriptor.key)?;
545        fs::data_upload_chunk_async(target, &bytes)
546            .await
547            .map_err(|err| {
548                data_error(format!(
549                    "failed to upload chunk '{}': {err}",
550                    descriptor.key
551                ))
552            })?;
553        tracing::info!(
554            target: "runmat.data",
555            dataset = %root.display(),
556            array = array,
557            chunk_key = descriptor.key,
558            bytes = bytes.len(),
559            "data chunk uploaded"
560        );
561    }
562    Ok(())
563}
564
565fn find_chunk_target<'a>(
566    targets: &'a [DataChunkUploadTarget],
567    key: &str,
568) -> BuiltinResult<&'a DataChunkUploadTarget> {
569    targets
570        .iter()
571        .find(|target| target.key == key)
572        .ok_or_else(|| data_error(format!("missing upload target for chunk '{key}'")))
573}
574
575pub fn sha256_hex(bytes: &[u8]) -> String {
576    let mut hasher = Sha256::new();
577    hasher.update(bytes);
578    let digest = hasher.finalize();
579    format!("sha256:{:x}", digest)
580}
581
582fn chunk_key(coords: &[usize]) -> String {
583    coords
584        .iter()
585        .map(|v| v.to_string())
586        .collect::<Vec<_>>()
587        .join(".")
588}
589
590fn chunk_grid_shape(shape: &[usize], chunk_shape: &[usize]) -> Vec<usize> {
591    shape
592        .iter()
593        .enumerate()
594        .map(|(idx, extent)| {
595            let chunk = chunk_shape.get(idx).copied().unwrap_or(1).max(1);
596            extent.div_ceil(chunk)
597        })
598        .collect()
599}
600
601fn chunk_start_for_coords(coords: &[usize], chunk_shape: &[usize]) -> Vec<usize> {
602    coords
603        .iter()
604        .enumerate()
605        .map(|(idx, coord)| coord * chunk_shape.get(idx).copied().unwrap_or(1).max(1))
606        .collect()
607}
608
609fn chunk_extent_for_start(
610    start: &[usize],
611    chunk_shape: &[usize],
612    full_shape: &[usize],
613) -> Vec<usize> {
614    start
615        .iter()
616        .enumerate()
617        .map(|(idx, start)| {
618            let chunk = chunk_shape.get(idx).copied().unwrap_or(1).max(1);
619            let end = (*start + chunk).min(full_shape[idx]);
620            end.saturating_sub(*start)
621        })
622        .collect()
623}
624
625fn collect_chunk_values(
626    payload: &DataArrayPayload,
627    chunk_start: &[usize],
628    chunk_extent: &[usize],
629) -> BuiltinResult<Vec<f64>> {
630    let mut local = vec![0usize; chunk_extent.len()];
631    let mut values = Vec::with_capacity(chunk_extent.iter().copied().product());
632    loop {
633        let mut global = Vec::with_capacity(chunk_extent.len());
634        for dim in 0..chunk_extent.len() {
635            global.push(chunk_start[dim] + local[dim]);
636        }
637        let linear = linear_index_column_major(&global, &payload.shape)?;
638        values.push(payload.values[linear]);
639        if !advance_index(&mut local, chunk_extent) {
640            break;
641        }
642    }
643    Ok(values)
644}
645
646fn chunk_coords_from_entry(entry: &DataChunkIndexEntry, rank: usize) -> BuiltinResult<Vec<usize>> {
647    if !entry.coords.is_empty() {
648        if entry.coords.len() != rank {
649            return Err(data_error(format!(
650                "chunk coords rank mismatch for key '{}': expected {rank}, got {}",
651                entry.key,
652                entry.coords.len()
653            )));
654        }
655        return Ok(entry.coords.clone());
656    }
657    let coords = entry
658        .key
659        .split('.')
660        .map(|part| {
661            part.parse::<usize>()
662                .map_err(|_| data_error(format!("invalid chunk key '{}'", entry.key)))
663        })
664        .collect::<BuiltinResult<Vec<_>>>()?;
665    if coords.len() != rank {
666        return Err(data_error(format!(
667            "chunk key rank mismatch for key '{}': expected {rank}, got {}",
668            entry.key,
669            coords.len()
670        )));
671    }
672    Ok(coords)
673}
674
675fn normalize_slice_bounds(
676    full_shape: &[usize],
677    start: &[usize],
678    shape: &[usize],
679) -> BuiltinResult<(Vec<usize>, Vec<usize>)> {
680    if full_shape.is_empty() {
681        return Ok((Vec::new(), Vec::new()));
682    }
683    let mut normalized_start = Vec::with_capacity(full_shape.len());
684    let mut normalized_shape = Vec::with_capacity(full_shape.len());
685    for (axis, axis_len) in full_shape.iter().copied().enumerate() {
686        if axis_len == 0 {
687            return Err(data_error("slice axis length must be greater than zero"));
688        }
689        let requested_start = start.get(axis).copied().unwrap_or(0);
690        let clamped_start = requested_start.min(axis_len.saturating_sub(1));
691        let requested_span = shape.get(axis).copied().unwrap_or(axis_len);
692        let clamped_span = requested_span
693            .max(1)
694            .min(axis_len.saturating_sub(clamped_start));
695        normalized_start.push(clamped_start);
696        normalized_shape.push(clamped_span);
697    }
698    Ok((normalized_start, normalized_shape))
699}
700
701fn coordinate_in_slice(global: &[usize], slice_start: &[usize], slice_shape: &[usize]) -> bool {
702    for dim in 0..slice_shape.len() {
703        let start = slice_start[dim];
704        let end = start.saturating_add(slice_shape[dim]);
705        let value = global[dim];
706        if value < start || value >= end {
707            return false;
708        }
709    }
710    true
711}
712
713fn chunk_intersects_slice(
714    chunk_start: &[usize],
715    chunk_extent: &[usize],
716    slice_start: &[usize],
717    slice_shape: &[usize],
718) -> bool {
719    for dim in 0..slice_shape.len() {
720        let chunk_lo = chunk_start[dim];
721        let chunk_hi = chunk_lo.saturating_add(chunk_extent[dim]);
722        let slice_lo = slice_start[dim];
723        let slice_hi = slice_lo.saturating_add(slice_shape[dim]);
724        if chunk_hi <= slice_lo || slice_hi <= chunk_lo {
725            return false;
726        }
727    }
728    true
729}
730
731fn extract_slice_payload(
732    payload: &DataArrayPayload,
733    start: &[usize],
734    shape: &[usize],
735) -> BuiltinResult<DataArrayPayload> {
736    let mut values = Vec::with_capacity(shape.iter().copied().product());
737    if shape.is_empty() {
738        return Ok(DataArrayPayload {
739            dtype: payload.dtype.clone(),
740            shape: Vec::new(),
741            values,
742        });
743    }
744    let mut local = vec![0usize; shape.len()];
745    loop {
746        let mut global = Vec::with_capacity(shape.len());
747        for dim in 0..shape.len() {
748            global.push(start[dim] + local[dim]);
749        }
750        let linear = linear_index_column_major(&global, &payload.shape)?;
751        values.push(payload.values[linear]);
752        if !advance_index(&mut local, shape) {
753            break;
754        }
755    }
756    Ok(DataArrayPayload {
757        dtype: payload.dtype.clone(),
758        shape: shape.to_vec(),
759        values,
760    })
761}
762
763fn linear_index_column_major(index: &[usize], shape: &[usize]) -> BuiltinResult<usize> {
764    if index.len() != shape.len() {
765        return Err(data_error("chunk index rank mismatch"));
766    }
767    let mut stride = 1usize;
768    let mut linear = 0usize;
769    for (idx, extent) in index.iter().zip(shape.iter()) {
770        if *idx >= *extent {
771            return Err(data_error("chunk index out of bounds"));
772        }
773        linear += idx * stride;
774        stride = stride.saturating_mul(*extent);
775    }
776    Ok(linear)
777}
778
779fn advance_index(index: &mut [usize], shape: &[usize]) -> bool {
780    if shape.is_empty() {
781        return false;
782    }
783    for dim in 0..shape.len() {
784        index[dim] += 1;
785        if index[dim] < shape[dim] {
786            return true;
787        }
788        index[dim] = 0;
789    }
790    false
791}
792
793pub fn parse_schema(schema: &Value) -> BuiltinResult<DataSchema> {
794    let Value::Struct(schema_struct) = schema else {
795        return Err(data_error("data.create: schema must be a struct"));
796    };
797    let arrays_value = schema_struct
798        .fields
799        .get("arrays")
800        .ok_or_else(|| data_error("data.create: schema missing 'arrays' field"))?;
801    let Value::Struct(arrays_struct) = arrays_value else {
802        return Err(data_error("data.create: schema.arrays must be a struct"));
803    };
804
805    let mut arrays = BTreeMap::new();
806    for (name, meta_value) in &arrays_struct.fields {
807        let Value::Struct(meta_struct) = meta_value else {
808            return Err(data_error(format!(
809                "data.create: schema.arrays.{name} must be a struct"
810            )));
811        };
812        let dtype = meta_struct
813            .fields
814            .get("dtype")
815            .map(|v| parse_string(v, "data.create schema dtype"))
816            .transpose()?
817            .unwrap_or_else(|| "f64".to_string());
818        let shape = meta_struct
819            .fields
820            .get("shape")
821            .map(parse_usize_vector)
822            .transpose()?
823            .unwrap_or_else(|| vec![0, 0]);
824        let chunk_shape = meta_struct
825            .fields
826            .get("chunk")
827            .map(parse_usize_vector)
828            .transpose()?
829            .unwrap_or_else(|| default_chunk_shape(&shape));
830        let codec = meta_struct
831            .fields
832            .get("codec")
833            .map(|v| parse_string(v, "data.create schema codec"))
834            .transpose()?
835            .unwrap_or_else(|| "zstd".to_string());
836        let data_path = format!("arrays/{name}/data.f64.json");
837        let chunk_index_path = format!("arrays/{name}/chunks/index.json");
838        arrays.insert(
839            name.clone(),
840            DataArrayMeta {
841                dtype,
842                shape,
843                chunk_shape,
844                order: default_array_order(),
845                codec,
846                chunk_index_path: Some(chunk_index_path),
847                data_path,
848            },
849        );
850    }
851
852    Ok(DataSchema { arrays })
853}
854
855fn default_chunk_shape(shape: &[usize]) -> Vec<usize> {
856    if shape.is_empty() {
857        return vec![1024];
858    }
859    let mut out = shape.to_vec();
860    if out.len() == 1 {
861        out[0] = out[0].clamp(1, 65_536);
862        return out;
863    }
864    out[0] = out[0].clamp(1, 256);
865    out[1] = out[1].clamp(1, 256);
866    for dim in out.iter_mut().skip(2) {
867        *dim = (*dim).clamp(1, 8);
868    }
869    out
870}
871
872fn parse_usize_vector(value: &Value) -> BuiltinResult<Vec<usize>> {
873    match value {
874        Value::Tensor(t) => tensor_to_usize_vector(t),
875        Value::Num(n) => {
876            if *n < 0.0 || !n.is_finite() {
877                return Err(data_error(
878                    "data schema dimensions must be non-negative finite numbers",
879                ));
880            }
881            Ok(vec![*n as usize])
882        }
883        Value::Int(i) => {
884            let n = i.to_i64();
885            if n < 0 {
886                return Err(data_error("data schema dimensions must be non-negative"));
887            }
888            Ok(vec![n as usize])
889        }
890        _ => Err(data_error(
891            "data schema dimension field must be numeric tensor/vector",
892        )),
893    }
894}
895
896fn tensor_to_usize_vector(t: &Tensor) -> BuiltinResult<Vec<usize>> {
897    let mut out = Vec::with_capacity(t.data.len());
898    for value in &t.data {
899        if !value.is_finite() || *value < 0.0 {
900            return Err(data_error(
901                "data schema dimensions must be non-negative finite numbers",
902            ));
903        }
904        out.push(*value as usize);
905    }
906    Ok(out)
907}
908
909pub fn dataset_object(path: &str, manifest: &DataManifest) -> Value {
910    let mut obj = ObjectInstance::new("Dataset".to_string());
911    obj.properties
912        .insert("__data_path".to_string(), Value::String(path.to_string()));
913    obj.properties.insert(
914        "__data_id".to_string(),
915        Value::String(manifest.dataset_id.clone()),
916    );
917    obj.properties.insert(
918        "__data_version".to_string(),
919        Value::String(manifest_version_token(manifest)),
920    );
921    Value::Object(obj)
922}
923
924pub fn manifest_version_token(manifest: &DataManifest) -> String {
925    format!("{}:{}", manifest.updated_at, manifest.txn_sequence)
926}
927
928pub fn ensure_manifest_sequence(expected: u64, manifest: &DataManifest) -> BuiltinResult<()> {
929    if manifest.txn_sequence != expected {
930        tracing::warn!(
931            target: "runmat.data",
932            expected_sequence = expected,
933            actual_sequence = manifest.txn_sequence,
934            "manifest conflict detected"
935        );
936        return Err(data_error_with_identifier(
937            "MANIFEST_CONFLICT: dataset changed since transaction begin",
938            DATA_MANIFEST_CONFLICT_IDENTIFIER,
939        ));
940    }
941    Ok(())
942}
943
944pub fn array_object(dataset_path: &str, array_name: &str) -> Value {
945    let mut obj = ObjectInstance::new("DataArray".to_string());
946    obj.properties.insert(
947        "__data_path".to_string(),
948        Value::String(dataset_path.to_string()),
949    );
950    obj.properties.insert(
951        "__array_name".to_string(),
952        Value::String(array_name.to_string()),
953    );
954    Value::Object(obj)
955}
956
957pub fn transaction_object(dataset_path: &str, tx_id: &str) -> Value {
958    let mut obj = ObjectInstance::new("DataTransaction".to_string());
959    obj.properties.insert(
960        "__data_path".to_string(),
961        Value::String(dataset_path.to_string()),
962    );
963    obj.properties
964        .insert("__tx_id".to_string(), Value::String(tx_id.to_string()));
965    Value::Object(obj)
966}
967
968pub fn get_object_prop<'a>(obj: &'a ObjectInstance, key: &str) -> BuiltinResult<&'a Value> {
969    obj.properties
970        .get(key)
971        .ok_or_else(|| data_error(format!("object missing internal property '{key}'")))
972}
973
974pub fn now_rfc3339() -> String {
975    Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true)
976}
977
978pub fn new_dataset_id() -> String {
979    static NEXT_DATASET_ID: AtomicU64 = AtomicU64::new(1);
980    let seq = NEXT_DATASET_ID.fetch_add(1, Ordering::Relaxed);
981    format!("ds_{}_{}", Utc::now().timestamp_millis(), seq)
982}
983
984pub fn new_tx_id() -> String {
985    static NEXT_TX_ID: AtomicU64 = AtomicU64::new(1);
986    let seq = NEXT_TX_ID.fetch_add(1, Ordering::Relaxed);
987    format!("tx_{}_{}", Utc::now().timestamp_millis(), seq)
988}
989
990pub fn start_tx(dataset_path: String, base_sequence: u64) -> String {
991    let tx_id = new_tx_id();
992    let pending = PendingTxn {
993        dataset_path,
994        base_sequence,
995        writes: Vec::new(),
996        resizes: Vec::new(),
997        fills: Vec::new(),
998        create_arrays: Vec::new(),
999        delete_arrays: Vec::new(),
1000        attrs: BTreeMap::new(),
1001        status: TxnStatus::Open,
1002    };
1003    let mut guard = tx_registry().lock().expect("tx registry lock poisoned");
1004    guard.insert(tx_id.clone(), pending);
1005    tx_id
1006}
1007
1008pub fn with_tx_mut<T>(
1009    tx_id: &str,
1010    f: impl FnOnce(&mut PendingTxn) -> BuiltinResult<T>,
1011) -> BuiltinResult<T> {
1012    let mut guard = tx_registry().lock().expect("tx registry lock poisoned");
1013    let tx = guard.get_mut(tx_id).ok_or_else(|| {
1014        data_error_with_identifier(
1015            format!("transaction '{tx_id}' not found"),
1016            DATA_TRANSACTION_NOT_FOUND_IDENTIFIER,
1017        )
1018    })?;
1019    f(tx)
1020}
1021
1022pub fn with_tx<T>(
1023    tx_id: &str,
1024    f: impl FnOnce(&PendingTxn) -> BuiltinResult<T>,
1025) -> BuiltinResult<T> {
1026    let guard = tx_registry().lock().expect("tx registry lock poisoned");
1027    let tx = guard.get(tx_id).ok_or_else(|| {
1028        data_error_with_identifier(
1029            format!("transaction '{tx_id}' not found"),
1030            DATA_TRANSACTION_NOT_FOUND_IDENTIFIER,
1031        )
1032    })?;
1033    f(tx)
1034}
1035
1036pub fn remove_tx(tx_id: &str) {
1037    let mut guard = tx_registry().lock().expect("tx registry lock poisoned");
1038    let _ = guard.remove(tx_id);
1039}
1040
1041#[cfg(test)]
1042mod tests {
1043    use super::*;
1044
1045    #[test]
1046    fn ensure_manifest_sequence_accepts_matching_sequence() {
1047        let manifest = DataManifest {
1048            schema_version: 1,
1049            format: "runmat-data".to_string(),
1050            dataset_id: "ds_test".to_string(),
1051            name: Some("test".to_string()),
1052            created_at: "2026-03-01T00:00:00Z".to_string(),
1053            updated_at: "2026-03-01T00:00:00Z".to_string(),
1054            arrays: BTreeMap::new(),
1055            attrs: BTreeMap::new(),
1056            txn_sequence: 5,
1057        };
1058        ensure_manifest_sequence(5, &manifest).expect("expected sequence match");
1059    }
1060
1061    #[test]
1062    fn ensure_manifest_sequence_rejects_conflict() {
1063        let manifest = DataManifest {
1064            schema_version: 1,
1065            format: "runmat-data".to_string(),
1066            dataset_id: "ds_test".to_string(),
1067            name: Some("test".to_string()),
1068            created_at: "2026-03-01T00:00:00Z".to_string(),
1069            updated_at: "2026-03-01T00:00:00Z".to_string(),
1070            arrays: BTreeMap::new(),
1071            attrs: BTreeMap::new(),
1072            txn_sequence: 6,
1073        };
1074        let err = ensure_manifest_sequence(5, &manifest).expect_err("expected conflict error");
1075        assert_eq!(
1076            err.identifier(),
1077            Some(DATA_MANIFEST_CONFLICT_IDENTIFIER),
1078            "manifest conflicts should expose a stable identifier"
1079        );
1080    }
1081
1082    #[test]
1083    fn transaction_registry_roundtrip() {
1084        let tx_id = start_tx("/datasets/test.data".to_string(), 7);
1085        let status = with_tx(&tx_id, |tx| Ok(tx.status.clone())).expect("tx lookup");
1086        assert_eq!(status, TxnStatus::Open);
1087        remove_tx(&tx_id);
1088        let err = with_tx(&tx_id, |_| Ok(())).expect_err("expected missing tx");
1089        assert_eq!(
1090            err.identifier(),
1091            Some(DATA_TRANSACTION_NOT_FOUND_IDENTIFIER),
1092            "missing transaction lookups should expose a stable identifier"
1093        );
1094    }
1095
1096    #[test]
1097    fn sha256_hash_format_matches_expected_prefix() {
1098        let hash = sha256_hex(b"runmat");
1099        assert!(hash.starts_with("sha256:"));
1100        assert_eq!(hash.len(), "sha256:".len() + 64);
1101    }
1102}