Skip to main content

runmat_runtime/data/
mod.rs

1use std::collections::{BTreeMap, HashMap};
2use std::path::{Path, PathBuf};
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Mutex, OnceLock};
5
6use chrono::Utc;
7use runmat_builtins::{ObjectInstance, Tensor, Value};
8use runmat_filesystem as fs;
9use runmat_filesystem::data_contract::{
10    DataChunkDescriptor, DataChunkUploadRequest, DataChunkUploadTarget,
11};
12use serde::{Deserialize, Serialize};
13use sha2::{Digest, Sha256};
14
15use crate::{build_runtime_error, BuiltinResult, RuntimeError};
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct DataManifest {
19    pub schema_version: u32,
20    pub format: String,
21    pub dataset_id: String,
22    pub name: Option<String>,
23    pub created_at: String,
24    pub updated_at: String,
25    pub arrays: BTreeMap<String, DataArrayMeta>,
26    pub attrs: BTreeMap<String, serde_json::Value>,
27    pub txn_sequence: u64,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct DataArrayMeta {
32    pub dtype: String,
33    pub shape: Vec<usize>,
34    pub chunk_shape: Vec<usize>,
35    #[serde(default = "default_array_order")]
36    pub order: String,
37    pub codec: String,
38    #[serde(default)]
39    pub chunk_index_path: Option<String>,
40    pub data_path: String,
41}
42
43fn default_array_order() -> String {
44    "column_major".to_string()
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct DataArrayPayload {
49    pub dtype: String,
50    pub shape: Vec<usize>,
51    pub values: Vec<f64>,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct DataChunkIndex {
56    pub schema_version: u32,
57    pub array: String,
58    pub chunks: Vec<DataChunkIndexEntry>,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct DataChunkIndexEntry {
63    pub key: String,
64    pub object_id: String,
65    pub hash: String,
66    pub bytes_raw: u64,
67    pub bytes_stored: u64,
68    #[serde(default)]
69    pub coords: Vec<usize>,
70    #[serde(default)]
71    pub shape: Vec<usize>,
72    pub data_path: String,
73}
74
75#[derive(Debug, Clone)]
76pub struct DataSchema {
77    pub arrays: BTreeMap<String, DataArrayMeta>,
78}
79
80#[derive(Debug, Clone)]
81pub struct PendingTxn {
82    pub dataset_path: String,
83    pub base_sequence: u64,
84    pub writes: Vec<PendingWrite>,
85    pub resizes: Vec<PendingResize>,
86    pub fills: Vec<PendingFill>,
87    pub create_arrays: Vec<PendingCreateArray>,
88    pub delete_arrays: Vec<String>,
89    pub attrs: BTreeMap<String, Value>,
90    pub status: TxnStatus,
91}
92
93#[derive(Debug, Clone)]
94pub struct PendingWrite {
95    pub array: String,
96    pub slice_spec: Option<Value>,
97    pub value: Value,
98}
99
100#[derive(Debug, Clone)]
101pub struct PendingResize {
102    pub array: String,
103    pub shape: Vec<usize>,
104}
105
106#[derive(Debug, Clone)]
107pub struct PendingFill {
108    pub array: String,
109    pub slice_spec: Option<Value>,
110    pub value: Value,
111}
112
113#[derive(Debug, Clone)]
114pub struct PendingCreateArray {
115    pub array: String,
116    pub meta: DataArrayMeta,
117}
118
119#[derive(Debug, Clone, PartialEq, Eq)]
120pub enum TxnStatus {
121    Open,
122    Committed,
123    Aborted,
124}
125
126fn tx_registry() -> &'static Mutex<HashMap<String, PendingTxn>> {
127    static REG: OnceLock<Mutex<HashMap<String, PendingTxn>>> = OnceLock::new();
128    REG.get_or_init(|| Mutex::new(HashMap::new()))
129}
130
131pub fn data_error(message: impl Into<String>) -> RuntimeError {
132    build_runtime_error(message)
133        .with_identifier("RUNMAT:Data:Error")
134        .with_builtin("data")
135        .build()
136}
137
138pub fn parse_string(value: &Value, context: &str) -> BuiltinResult<String> {
139    match value {
140        Value::String(s) => Ok(s.clone()),
141        Value::CharArray(ca) => Ok(ca.to_string()),
142        _ => Err(data_error(format!("{context}: expected string value"))),
143    }
144}
145
146pub fn dataset_root(path: &str) -> PathBuf {
147    PathBuf::from(path)
148}
149
150pub fn manifest_path(root: &Path) -> PathBuf {
151    root.join("manifest.json")
152}
153
154pub fn arrays_root(root: &Path) -> PathBuf {
155    root.join("arrays")
156}
157
158pub async fn write_manifest_async(root: &Path, manifest: &DataManifest) -> BuiltinResult<()> {
159    fs::create_dir_all_async(root).await.map_err(|err| {
160        data_error(format!(
161            "failed to create dataset root '{}': {err}",
162            root.display()
163        ))
164    })?;
165    let path = manifest_path(root);
166    let bytes = serde_json::to_vec_pretty(manifest)
167        .map_err(|err| data_error(format!("failed to encode manifest json: {err}")))?;
168    fs::write_async(&path, &bytes).await.map_err(|err| {
169        data_error(format!(
170            "failed to write manifest '{}': {err}",
171            path.display()
172        ))
173    })?;
174    Ok(())
175}
176
177pub async fn read_manifest_async(root: &Path) -> BuiltinResult<DataManifest> {
178    let path = manifest_path(root);
179    let bytes = fs::read_async(&path).await.map_err(|err| {
180        data_error(format!(
181            "failed to read manifest '{}': {err}",
182            path.display()
183        ))
184    })?;
185    let manifest = serde_json::from_slice::<DataManifest>(&bytes).map_err(|err| {
186        data_error(format!(
187            "failed to parse manifest '{}': {err}",
188            path.display()
189        ))
190    })?;
191    Ok(manifest)
192}
193
194pub async fn write_array_payload_async(
195    root: &Path,
196    array: &str,
197    payload: &DataArrayPayload,
198    chunk_shape: &[usize],
199) -> BuiltinResult<(PathBuf, PathBuf)> {
200    let array_dir = arrays_root(root).join(array);
201    fs::create_dir_all_async(&array_dir).await.map_err(|err| {
202        data_error(format!(
203            "failed to create array dir '{}': {err}",
204            array_dir.display()
205        ))
206    })?;
207    let payload_path = array_dir.join("data.f64.json");
208    let bytes = serde_json::to_vec(payload)
209        .map_err(|err| data_error(format!("failed to encode array payload json: {err}")))?;
210    fs::write_async(&payload_path, &bytes)
211        .await
212        .map_err(|err| {
213            data_error(format!(
214                "failed to write payload '{}': {err}",
215                payload_path.display()
216            ))
217        })?;
218
219    let chunk_dir = array_dir.join("chunks");
220    fs::create_dir_all_async(&chunk_dir).await.map_err(|err| {
221        data_error(format!(
222            "failed to create chunk dir '{}': {err}",
223            chunk_dir.display()
224        ))
225    })?;
226
227    let mut index = DataChunkIndex {
228        schema_version: 1,
229        array: array.to_string(),
230        chunks: Vec::new(),
231    };
232    let mut upload_chunks = Vec::new();
233    let grid_shape = chunk_grid_shape(&payload.shape, chunk_shape);
234    let mut coords = vec![0usize; payload.shape.len()];
235    loop {
236        let chunk_start = chunk_start_for_coords(&coords, chunk_shape);
237        let chunk_extent = chunk_extent_for_start(&chunk_start, chunk_shape, &payload.shape);
238        let chunk_payload = DataArrayPayload {
239            dtype: payload.dtype.clone(),
240            shape: chunk_extent.clone(),
241            values: collect_chunk_values(payload, &chunk_start, &chunk_extent)?,
242        };
243        let key = chunk_key(&coords);
244        let object_id = format!("obj_{}", key.replace('.', "_"));
245        let chunk_bytes = serde_json::to_vec(&chunk_payload)
246            .map_err(|err| data_error(format!("failed to encode chunk payload: {err}")))?;
247        let data_path = chunk_dir.join(format!("{object_id}.json"));
248        fs::write_async(&data_path, &chunk_bytes)
249            .await
250            .map_err(|err| {
251                data_error(format!(
252                    "failed to write chunk '{}': {err}",
253                    data_path.display()
254                ))
255            })?;
256        let hash = sha256_hex(&chunk_bytes);
257        let rel_chunk_path = data_path
258            .strip_prefix(root)
259            .map_err(|err| data_error(format!("failed to compute chunk relative path: {err}")))?
260            .to_string_lossy()
261            .to_string();
262        index.chunks.push(DataChunkIndexEntry {
263            key: key.clone(),
264            object_id: object_id.clone(),
265            hash: hash.clone(),
266            bytes_raw: chunk_bytes.len() as u64,
267            bytes_stored: chunk_bytes.len() as u64,
268            coords: coords.clone(),
269            shape: chunk_extent,
270            data_path: rel_chunk_path,
271        });
272        upload_chunks.push((
273            DataChunkDescriptor {
274                key,
275                object_id,
276                hash,
277                bytes_raw: chunk_bytes.len() as u64,
278                bytes_stored: chunk_bytes.len() as u64,
279            },
280            chunk_bytes,
281        ));
282        if !advance_index(&mut coords, &grid_shape) {
283            break;
284        }
285    }
286
287    maybe_upload_chunks_async(root, array, upload_chunks).await?;
288
289    tracing::info!(
290        target: "runmat.data",
291        dataset = %root.display(),
292        array = array,
293        chunks = index.chunks.len(),
294        payload_bytes = bytes.len(),
295        "data chunk write planned"
296    );
297
298    let chunk_index_path = chunk_dir.join("index.json");
299    let chunk_index_bytes = serde_json::to_vec(&index)
300        .map_err(|err| data_error(format!("failed to encode chunk index json: {err}")))?;
301    fs::write_async(&chunk_index_path, &chunk_index_bytes)
302        .await
303        .map_err(|err| {
304            data_error(format!(
305                "failed to write chunk index '{}': {err}",
306                chunk_index_path.display()
307            ))
308        })?;
309    Ok((payload_path, chunk_index_path))
310}
311
312pub async fn read_array_payload_async(
313    root: &Path,
314    meta: &DataArrayMeta,
315) -> BuiltinResult<DataArrayPayload> {
316    if let Some(index_path) = &meta.chunk_index_path {
317        let path = root.join(index_path);
318        if fs::metadata_async(&path).await.is_ok() {
319            return read_array_payload_chunked_async(root, meta, &path).await;
320        }
321    }
322    let payload_path = root.join(&meta.data_path);
323    let bytes = fs::read_async(&payload_path).await.map_err(|err| {
324        data_error(format!(
325            "failed to read payload '{}': {err}",
326            payload_path.display()
327        ))
328    })?;
329    serde_json::from_slice::<DataArrayPayload>(&bytes).map_err(|err| {
330        data_error(format!(
331            "failed to parse payload '{}': {err}",
332            payload_path.display()
333        ))
334    })
335}
336
337pub async fn read_array_slice_payload_async(
338    root: &Path,
339    meta: &DataArrayMeta,
340    start: &[usize],
341    shape: &[usize],
342) -> BuiltinResult<DataArrayPayload> {
343    let (slice_start, slice_shape) = normalize_slice_bounds(&meta.shape, start, shape)?;
344    if let Some(index_path) = &meta.chunk_index_path {
345        let path = root.join(index_path);
346        if fs::metadata_async(&path).await.is_ok() {
347            return read_array_payload_chunked_slice_async(
348                root,
349                meta,
350                &path,
351                &slice_start,
352                &slice_shape,
353            )
354            .await;
355        }
356    }
357    let full = read_array_payload_async(root, meta).await?;
358    extract_slice_payload(&full, &slice_start, &slice_shape)
359}
360
361async fn read_array_payload_chunked_slice_async(
362    root: &Path,
363    meta: &DataArrayMeta,
364    index_path: &Path,
365    slice_start: &[usize],
366    slice_shape: &[usize],
367) -> BuiltinResult<DataArrayPayload> {
368    let bytes = fs::read_async(index_path).await.map_err(|err| {
369        data_error(format!(
370            "failed to read chunk index '{}': {err}",
371            index_path.display()
372        ))
373    })?;
374    let index: DataChunkIndex = serde_json::from_slice(&bytes).map_err(|err| {
375        data_error(format!(
376            "failed to parse chunk index '{}': {err}",
377            index_path.display()
378        ))
379    })?;
380
381    let mut values = vec![0.0; slice_shape.iter().copied().product::<usize>()];
382    for chunk in index.chunks {
383        let coords = chunk_coords_from_entry(&chunk, meta.shape.len())?;
384        let chunk_start = chunk_start_for_coords(&coords, &meta.chunk_shape);
385        let chunk_extent = if chunk.shape.is_empty() {
386            chunk_extent_for_start(&chunk_start, &meta.chunk_shape, &meta.shape)
387        } else {
388            chunk.shape.clone()
389        };
390        if !chunk_intersects_slice(&chunk_start, &chunk_extent, slice_start, slice_shape) {
391            continue;
392        }
393
394        let chunk_path = root.join(&chunk.data_path);
395        let bytes = fs::read_async(&chunk_path).await.map_err(|err| {
396            data_error(format!(
397                "failed to read chunk payload '{}': {err}",
398                chunk_path.display()
399            ))
400        })?;
401        let payload: DataArrayPayload = serde_json::from_slice(&bytes).map_err(|err| {
402            data_error(format!(
403                "failed to parse chunk payload '{}': {err}",
404                chunk_path.display()
405            ))
406        })?;
407        if payload.shape != chunk_extent {
408            return Err(data_error(format!(
409                "chunk payload shape mismatch for key '{}': {:?} != {:?}",
410                chunk.key, payload.shape, chunk_extent
411            )));
412        }
413
414        let mut local = vec![0usize; chunk_extent.len()];
415        loop {
416            let mut global = Vec::with_capacity(chunk_extent.len());
417            for dim in 0..chunk_extent.len() {
418                global.push(chunk_start[dim] + local[dim]);
419            }
420            if coordinate_in_slice(&global, slice_start, slice_shape) {
421                let src_linear = linear_index_column_major(&local, &chunk_extent)?;
422                let mut dst = Vec::with_capacity(slice_shape.len());
423                for dim in 0..slice_shape.len() {
424                    dst.push(global[dim].saturating_sub(slice_start[dim]));
425                }
426                let dst_linear = linear_index_column_major(&dst, slice_shape)?;
427                values[dst_linear] = payload.values[src_linear];
428            }
429            if !advance_index(&mut local, &chunk_extent) {
430                break;
431            }
432        }
433    }
434
435    Ok(DataArrayPayload {
436        dtype: meta.dtype.clone(),
437        shape: slice_shape.to_vec(),
438        values,
439    })
440}
441
442async fn read_array_payload_chunked_async(
443    root: &Path,
444    meta: &DataArrayMeta,
445    index_path: &Path,
446) -> BuiltinResult<DataArrayPayload> {
447    let bytes = fs::read_async(index_path).await.map_err(|err| {
448        data_error(format!(
449            "failed to read chunk index '{}': {err}",
450            index_path.display()
451        ))
452    })?;
453    let index: DataChunkIndex = serde_json::from_slice(&bytes).map_err(|err| {
454        data_error(format!(
455            "failed to parse chunk index '{}': {err}",
456            index_path.display()
457        ))
458    })?;
459    let mut values = vec![0.0; meta.shape.iter().copied().product::<usize>()];
460    for chunk in index.chunks {
461        let chunk_path = root.join(&chunk.data_path);
462        let bytes = fs::read_async(&chunk_path).await.map_err(|err| {
463            data_error(format!(
464                "failed to read chunk payload '{}': {err}",
465                chunk_path.display()
466            ))
467        })?;
468        let payload: DataArrayPayload = serde_json::from_slice(&bytes).map_err(|err| {
469            data_error(format!(
470                "failed to parse chunk payload '{}': {err}",
471                chunk_path.display()
472            ))
473        })?;
474        let coords = chunk_coords_from_entry(&chunk, meta.shape.len())?;
475        let chunk_start = chunk_start_for_coords(&coords, &meta.chunk_shape);
476        let chunk_extent = if chunk.shape.is_empty() {
477            chunk_extent_for_start(&chunk_start, &meta.chunk_shape, &meta.shape)
478        } else {
479            chunk.shape.clone()
480        };
481        if payload.shape != chunk_extent {
482            return Err(data_error(format!(
483                "chunk payload shape mismatch for key '{}': {:?} != {:?}",
484                chunk.key, payload.shape, chunk_extent
485            )));
486        }
487        let mut local = vec![0usize; chunk_extent.len()];
488        loop {
489            let mut global = Vec::with_capacity(chunk_extent.len());
490            for dim in 0..chunk_extent.len() {
491                global.push(chunk_start[dim] + local[dim]);
492            }
493            let src_linear = linear_index_column_major(&local, &chunk_extent)?;
494            let dst_linear = linear_index_column_major(&global, &meta.shape)?;
495            values[dst_linear] = payload.values[src_linear];
496            if !advance_index(&mut local, &chunk_extent) {
497                break;
498            }
499        }
500    }
501    Ok(DataArrayPayload {
502        dtype: meta.dtype.clone(),
503        shape: meta.shape.clone(),
504        values,
505    })
506}
507
508async fn maybe_upload_chunks_async(
509    root: &Path,
510    array: &str,
511    chunks: Vec<(DataChunkDescriptor, Vec<u8>)>,
512) -> BuiltinResult<()> {
513    if chunks.is_empty() {
514        return Ok(());
515    }
516    let request = DataChunkUploadRequest {
517        dataset_path: root.to_string_lossy().to_string(),
518        array: array.to_string(),
519        chunks: chunks.iter().map(|(desc, _)| desc.clone()).collect(),
520    };
521    let targets = match fs::data_chunk_upload_targets_async(&request).await {
522        Ok(targets) => targets,
523        Err(err) if err.kind() == std::io::ErrorKind::Unsupported => return Ok(()),
524        Err(err) => {
525            return Err(data_error(format!(
526                "failed to request data chunk upload targets: {err}"
527            )))
528        }
529    };
530    for (descriptor, bytes) in chunks {
531        let target = find_chunk_target(&targets, &descriptor.key)?;
532        fs::data_upload_chunk_async(target, &bytes)
533            .await
534            .map_err(|err| {
535                data_error(format!(
536                    "failed to upload chunk '{}': {err}",
537                    descriptor.key
538                ))
539            })?;
540        tracing::info!(
541            target: "runmat.data",
542            dataset = %root.display(),
543            array = array,
544            chunk_key = descriptor.key,
545            bytes = bytes.len(),
546            "data chunk uploaded"
547        );
548    }
549    Ok(())
550}
551
552fn find_chunk_target<'a>(
553    targets: &'a [DataChunkUploadTarget],
554    key: &str,
555) -> BuiltinResult<&'a DataChunkUploadTarget> {
556    targets
557        .iter()
558        .find(|target| target.key == key)
559        .ok_or_else(|| data_error(format!("missing upload target for chunk '{key}'")))
560}
561
562pub fn sha256_hex(bytes: &[u8]) -> String {
563    let mut hasher = Sha256::new();
564    hasher.update(bytes);
565    let digest = hasher.finalize();
566    format!("sha256:{:x}", digest)
567}
568
569fn chunk_key(coords: &[usize]) -> String {
570    coords
571        .iter()
572        .map(|v| v.to_string())
573        .collect::<Vec<_>>()
574        .join(".")
575}
576
577fn chunk_grid_shape(shape: &[usize], chunk_shape: &[usize]) -> Vec<usize> {
578    shape
579        .iter()
580        .enumerate()
581        .map(|(idx, extent)| {
582            let chunk = chunk_shape.get(idx).copied().unwrap_or(1).max(1);
583            extent.div_ceil(chunk)
584        })
585        .collect()
586}
587
588fn chunk_start_for_coords(coords: &[usize], chunk_shape: &[usize]) -> Vec<usize> {
589    coords
590        .iter()
591        .enumerate()
592        .map(|(idx, coord)| coord * chunk_shape.get(idx).copied().unwrap_or(1).max(1))
593        .collect()
594}
595
596fn chunk_extent_for_start(
597    start: &[usize],
598    chunk_shape: &[usize],
599    full_shape: &[usize],
600) -> Vec<usize> {
601    start
602        .iter()
603        .enumerate()
604        .map(|(idx, start)| {
605            let chunk = chunk_shape.get(idx).copied().unwrap_or(1).max(1);
606            let end = (*start + chunk).min(full_shape[idx]);
607            end.saturating_sub(*start)
608        })
609        .collect()
610}
611
612fn collect_chunk_values(
613    payload: &DataArrayPayload,
614    chunk_start: &[usize],
615    chunk_extent: &[usize],
616) -> BuiltinResult<Vec<f64>> {
617    let mut local = vec![0usize; chunk_extent.len()];
618    let mut values = Vec::with_capacity(chunk_extent.iter().copied().product());
619    loop {
620        let mut global = Vec::with_capacity(chunk_extent.len());
621        for dim in 0..chunk_extent.len() {
622            global.push(chunk_start[dim] + local[dim]);
623        }
624        let linear = linear_index_column_major(&global, &payload.shape)?;
625        values.push(payload.values[linear]);
626        if !advance_index(&mut local, chunk_extent) {
627            break;
628        }
629    }
630    Ok(values)
631}
632
633fn chunk_coords_from_entry(entry: &DataChunkIndexEntry, rank: usize) -> BuiltinResult<Vec<usize>> {
634    if !entry.coords.is_empty() {
635        if entry.coords.len() != rank {
636            return Err(data_error(format!(
637                "chunk coords rank mismatch for key '{}': expected {rank}, got {}",
638                entry.key,
639                entry.coords.len()
640            )));
641        }
642        return Ok(entry.coords.clone());
643    }
644    let coords = entry
645        .key
646        .split('.')
647        .map(|part| {
648            part.parse::<usize>()
649                .map_err(|_| data_error(format!("invalid chunk key '{}'", entry.key)))
650        })
651        .collect::<BuiltinResult<Vec<_>>>()?;
652    if coords.len() != rank {
653        return Err(data_error(format!(
654            "chunk key rank mismatch for key '{}': expected {rank}, got {}",
655            entry.key,
656            coords.len()
657        )));
658    }
659    Ok(coords)
660}
661
662fn normalize_slice_bounds(
663    full_shape: &[usize],
664    start: &[usize],
665    shape: &[usize],
666) -> BuiltinResult<(Vec<usize>, Vec<usize>)> {
667    if full_shape.is_empty() {
668        return Ok((Vec::new(), Vec::new()));
669    }
670    let mut normalized_start = Vec::with_capacity(full_shape.len());
671    let mut normalized_shape = Vec::with_capacity(full_shape.len());
672    for (axis, axis_len) in full_shape.iter().copied().enumerate() {
673        if axis_len == 0 {
674            return Err(data_error("slice axis length must be greater than zero"));
675        }
676        let requested_start = start.get(axis).copied().unwrap_or(0);
677        let clamped_start = requested_start.min(axis_len.saturating_sub(1));
678        let requested_span = shape.get(axis).copied().unwrap_or(axis_len);
679        let clamped_span = requested_span
680            .max(1)
681            .min(axis_len.saturating_sub(clamped_start));
682        normalized_start.push(clamped_start);
683        normalized_shape.push(clamped_span);
684    }
685    Ok((normalized_start, normalized_shape))
686}
687
688fn coordinate_in_slice(global: &[usize], slice_start: &[usize], slice_shape: &[usize]) -> bool {
689    for dim in 0..slice_shape.len() {
690        let start = slice_start[dim];
691        let end = start.saturating_add(slice_shape[dim]);
692        let value = global[dim];
693        if value < start || value >= end {
694            return false;
695        }
696    }
697    true
698}
699
700fn chunk_intersects_slice(
701    chunk_start: &[usize],
702    chunk_extent: &[usize],
703    slice_start: &[usize],
704    slice_shape: &[usize],
705) -> bool {
706    for dim in 0..slice_shape.len() {
707        let chunk_lo = chunk_start[dim];
708        let chunk_hi = chunk_lo.saturating_add(chunk_extent[dim]);
709        let slice_lo = slice_start[dim];
710        let slice_hi = slice_lo.saturating_add(slice_shape[dim]);
711        if chunk_hi <= slice_lo || slice_hi <= chunk_lo {
712            return false;
713        }
714    }
715    true
716}
717
718fn extract_slice_payload(
719    payload: &DataArrayPayload,
720    start: &[usize],
721    shape: &[usize],
722) -> BuiltinResult<DataArrayPayload> {
723    let mut values = Vec::with_capacity(shape.iter().copied().product());
724    if shape.is_empty() {
725        return Ok(DataArrayPayload {
726            dtype: payload.dtype.clone(),
727            shape: Vec::new(),
728            values,
729        });
730    }
731    let mut local = vec![0usize; shape.len()];
732    loop {
733        let mut global = Vec::with_capacity(shape.len());
734        for dim in 0..shape.len() {
735            global.push(start[dim] + local[dim]);
736        }
737        let linear = linear_index_column_major(&global, &payload.shape)?;
738        values.push(payload.values[linear]);
739        if !advance_index(&mut local, shape) {
740            break;
741        }
742    }
743    Ok(DataArrayPayload {
744        dtype: payload.dtype.clone(),
745        shape: shape.to_vec(),
746        values,
747    })
748}
749
750fn linear_index_column_major(index: &[usize], shape: &[usize]) -> BuiltinResult<usize> {
751    if index.len() != shape.len() {
752        return Err(data_error("chunk index rank mismatch"));
753    }
754    let mut stride = 1usize;
755    let mut linear = 0usize;
756    for (idx, extent) in index.iter().zip(shape.iter()) {
757        if *idx >= *extent {
758            return Err(data_error("chunk index out of bounds"));
759        }
760        linear += idx * stride;
761        stride = stride.saturating_mul(*extent);
762    }
763    Ok(linear)
764}
765
766fn advance_index(index: &mut [usize], shape: &[usize]) -> bool {
767    if shape.is_empty() {
768        return false;
769    }
770    for dim in 0..shape.len() {
771        index[dim] += 1;
772        if index[dim] < shape[dim] {
773            return true;
774        }
775        index[dim] = 0;
776    }
777    false
778}
779
780pub fn parse_schema(schema: &Value) -> BuiltinResult<DataSchema> {
781    let Value::Struct(schema_struct) = schema else {
782        return Err(data_error("data.create: schema must be a struct"));
783    };
784    let arrays_value = schema_struct
785        .fields
786        .get("arrays")
787        .ok_or_else(|| data_error("data.create: schema missing 'arrays' field"))?;
788    let Value::Struct(arrays_struct) = arrays_value else {
789        return Err(data_error("data.create: schema.arrays must be a struct"));
790    };
791
792    let mut arrays = BTreeMap::new();
793    for (name, meta_value) in &arrays_struct.fields {
794        let Value::Struct(meta_struct) = meta_value else {
795            return Err(data_error(format!(
796                "data.create: schema.arrays.{name} must be a struct"
797            )));
798        };
799        let dtype = meta_struct
800            .fields
801            .get("dtype")
802            .map(|v| parse_string(v, "data.create schema dtype"))
803            .transpose()?
804            .unwrap_or_else(|| "f64".to_string());
805        let shape = meta_struct
806            .fields
807            .get("shape")
808            .map(parse_usize_vector)
809            .transpose()?
810            .unwrap_or_else(|| vec![0, 0]);
811        let chunk_shape = meta_struct
812            .fields
813            .get("chunk")
814            .map(parse_usize_vector)
815            .transpose()?
816            .unwrap_or_else(|| default_chunk_shape(&shape));
817        let codec = meta_struct
818            .fields
819            .get("codec")
820            .map(|v| parse_string(v, "data.create schema codec"))
821            .transpose()?
822            .unwrap_or_else(|| "zstd".to_string());
823        let data_path = format!("arrays/{name}/data.f64.json");
824        let chunk_index_path = format!("arrays/{name}/chunks/index.json");
825        arrays.insert(
826            name.clone(),
827            DataArrayMeta {
828                dtype,
829                shape,
830                chunk_shape,
831                order: default_array_order(),
832                codec,
833                chunk_index_path: Some(chunk_index_path),
834                data_path,
835            },
836        );
837    }
838
839    Ok(DataSchema { arrays })
840}
841
842fn default_chunk_shape(shape: &[usize]) -> Vec<usize> {
843    if shape.is_empty() {
844        return vec![1024];
845    }
846    let mut out = shape.to_vec();
847    if out.len() == 1 {
848        out[0] = out[0].clamp(1, 65_536);
849        return out;
850    }
851    out[0] = out[0].clamp(1, 256);
852    out[1] = out[1].clamp(1, 256);
853    for dim in out.iter_mut().skip(2) {
854        *dim = (*dim).clamp(1, 8);
855    }
856    out
857}
858
859fn parse_usize_vector(value: &Value) -> BuiltinResult<Vec<usize>> {
860    match value {
861        Value::Tensor(t) => tensor_to_usize_vector(t),
862        Value::Num(n) => {
863            if *n < 0.0 || !n.is_finite() {
864                return Err(data_error(
865                    "data schema dimensions must be non-negative finite numbers",
866                ));
867            }
868            Ok(vec![*n as usize])
869        }
870        Value::Int(i) => {
871            let n = i.to_i64();
872            if n < 0 {
873                return Err(data_error("data schema dimensions must be non-negative"));
874            }
875            Ok(vec![n as usize])
876        }
877        _ => Err(data_error(
878            "data schema dimension field must be numeric tensor/vector",
879        )),
880    }
881}
882
883fn tensor_to_usize_vector(t: &Tensor) -> BuiltinResult<Vec<usize>> {
884    let mut out = Vec::with_capacity(t.data.len());
885    for value in &t.data {
886        if !value.is_finite() || *value < 0.0 {
887            return Err(data_error(
888                "data schema dimensions must be non-negative finite numbers",
889            ));
890        }
891        out.push(*value as usize);
892    }
893    Ok(out)
894}
895
896pub fn dataset_object(path: &str, manifest: &DataManifest) -> Value {
897    let mut obj = ObjectInstance::new("Dataset".to_string());
898    obj.properties
899        .insert("__data_path".to_string(), Value::String(path.to_string()));
900    obj.properties.insert(
901        "__data_id".to_string(),
902        Value::String(manifest.dataset_id.clone()),
903    );
904    obj.properties.insert(
905        "__data_version".to_string(),
906        Value::String(manifest_version_token(manifest)),
907    );
908    Value::Object(obj)
909}
910
911pub fn manifest_version_token(manifest: &DataManifest) -> String {
912    format!("{}:{}", manifest.updated_at, manifest.txn_sequence)
913}
914
915pub fn ensure_manifest_sequence(expected: u64, manifest: &DataManifest) -> BuiltinResult<()> {
916    if manifest.txn_sequence != expected {
917        tracing::warn!(
918            target: "runmat.data",
919            expected_sequence = expected,
920            actual_sequence = manifest.txn_sequence,
921            "manifest conflict detected"
922        );
923        return Err(data_error(
924            "MANIFEST_CONFLICT: dataset changed since transaction begin",
925        ));
926    }
927    Ok(())
928}
929
930pub fn array_object(dataset_path: &str, array_name: &str) -> Value {
931    let mut obj = ObjectInstance::new("DataArray".to_string());
932    obj.properties.insert(
933        "__data_path".to_string(),
934        Value::String(dataset_path.to_string()),
935    );
936    obj.properties.insert(
937        "__array_name".to_string(),
938        Value::String(array_name.to_string()),
939    );
940    Value::Object(obj)
941}
942
943pub fn transaction_object(dataset_path: &str, tx_id: &str) -> Value {
944    let mut obj = ObjectInstance::new("DataTransaction".to_string());
945    obj.properties.insert(
946        "__data_path".to_string(),
947        Value::String(dataset_path.to_string()),
948    );
949    obj.properties
950        .insert("__tx_id".to_string(), Value::String(tx_id.to_string()));
951    Value::Object(obj)
952}
953
954pub fn get_object_prop<'a>(obj: &'a ObjectInstance, key: &str) -> BuiltinResult<&'a Value> {
955    obj.properties
956        .get(key)
957        .ok_or_else(|| data_error(format!("object missing internal property '{key}'")))
958}
959
960pub fn now_rfc3339() -> String {
961    Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true)
962}
963
964pub fn new_dataset_id() -> String {
965    static NEXT_DATASET_ID: AtomicU64 = AtomicU64::new(1);
966    let seq = NEXT_DATASET_ID.fetch_add(1, Ordering::Relaxed);
967    format!("ds_{}_{}", Utc::now().timestamp_millis(), seq)
968}
969
970pub fn new_tx_id() -> String {
971    static NEXT_TX_ID: AtomicU64 = AtomicU64::new(1);
972    let seq = NEXT_TX_ID.fetch_add(1, Ordering::Relaxed);
973    format!("tx_{}_{}", Utc::now().timestamp_millis(), seq)
974}
975
976pub fn start_tx(dataset_path: String, base_sequence: u64) -> String {
977    let tx_id = new_tx_id();
978    let pending = PendingTxn {
979        dataset_path,
980        base_sequence,
981        writes: Vec::new(),
982        resizes: Vec::new(),
983        fills: Vec::new(),
984        create_arrays: Vec::new(),
985        delete_arrays: Vec::new(),
986        attrs: BTreeMap::new(),
987        status: TxnStatus::Open,
988    };
989    let mut guard = tx_registry().lock().expect("tx registry lock poisoned");
990    guard.insert(tx_id.clone(), pending);
991    tx_id
992}
993
994pub fn with_tx_mut<T>(
995    tx_id: &str,
996    f: impl FnOnce(&mut PendingTxn) -> BuiltinResult<T>,
997) -> BuiltinResult<T> {
998    let mut guard = tx_registry().lock().expect("tx registry lock poisoned");
999    let tx = guard
1000        .get_mut(tx_id)
1001        .ok_or_else(|| data_error(format!("transaction '{tx_id}' not found")))?;
1002    f(tx)
1003}
1004
1005pub fn with_tx<T>(
1006    tx_id: &str,
1007    f: impl FnOnce(&PendingTxn) -> BuiltinResult<T>,
1008) -> BuiltinResult<T> {
1009    let guard = tx_registry().lock().expect("tx registry lock poisoned");
1010    let tx = guard
1011        .get(tx_id)
1012        .ok_or_else(|| data_error(format!("transaction '{tx_id}' not found")))?;
1013    f(tx)
1014}
1015
1016pub fn remove_tx(tx_id: &str) {
1017    let mut guard = tx_registry().lock().expect("tx registry lock poisoned");
1018    let _ = guard.remove(tx_id);
1019}
1020
1021#[cfg(test)]
1022mod tests {
1023    use super::*;
1024
1025    #[test]
1026    fn ensure_manifest_sequence_accepts_matching_sequence() {
1027        let manifest = DataManifest {
1028            schema_version: 1,
1029            format: "runmat-data".to_string(),
1030            dataset_id: "ds_test".to_string(),
1031            name: Some("test".to_string()),
1032            created_at: "2026-03-01T00:00:00Z".to_string(),
1033            updated_at: "2026-03-01T00:00:00Z".to_string(),
1034            arrays: BTreeMap::new(),
1035            attrs: BTreeMap::new(),
1036            txn_sequence: 5,
1037        };
1038        ensure_manifest_sequence(5, &manifest).expect("expected sequence match");
1039    }
1040
1041    #[test]
1042    fn ensure_manifest_sequence_rejects_conflict() {
1043        let manifest = DataManifest {
1044            schema_version: 1,
1045            format: "runmat-data".to_string(),
1046            dataset_id: "ds_test".to_string(),
1047            name: Some("test".to_string()),
1048            created_at: "2026-03-01T00:00:00Z".to_string(),
1049            updated_at: "2026-03-01T00:00:00Z".to_string(),
1050            arrays: BTreeMap::new(),
1051            attrs: BTreeMap::new(),
1052            txn_sequence: 6,
1053        };
1054        let err = ensure_manifest_sequence(5, &manifest).expect_err("expected conflict error");
1055        assert!(err.message().contains("MANIFEST_CONFLICT"));
1056    }
1057
1058    #[test]
1059    fn transaction_registry_roundtrip() {
1060        let tx_id = start_tx("/datasets/test.data".to_string(), 7);
1061        let status = with_tx(&tx_id, |tx| Ok(tx.status.clone())).expect("tx lookup");
1062        assert_eq!(status, TxnStatus::Open);
1063        remove_tx(&tx_id);
1064        let err = with_tx(&tx_id, |_| Ok(())).expect_err("expected missing tx");
1065        assert!(err.message().contains("not found"));
1066    }
1067
1068    #[test]
1069    fn sha256_hash_format_matches_expected_prefix() {
1070        let hash = sha256_hex(b"runmat");
1071        assert!(hash.starts_with("sha256:"));
1072        assert_eq!(hash.len(), "sha256:".len() + 64);
1073    }
1074}