Skip to main content

runmat_runtime/data/
mod.rs

1use std::cell::RefCell;
2use std::collections::{BTreeMap, HashMap};
3use std::future::Future;
4use std::path::{Path, PathBuf};
5use std::sync::atomic::{AtomicU64, Ordering};
6
7use chrono::Utc;
8use runmat_builtins::{ObjectInstance, Tensor, Value};
9use runmat_filesystem as fs;
10use runmat_filesystem::data_contract::{
11    DataChunkDescriptor, DataChunkUploadRequest, DataChunkUploadTarget,
12};
13use serde::{Deserialize, Serialize};
14use sha2::{Digest, Sha256};
15
16use crate::{build_runtime_error, BuiltinResult, RuntimeError};
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct DataManifest {
20    pub schema_version: u32,
21    pub format: String,
22    pub dataset_id: String,
23    pub name: Option<String>,
24    pub created_at: String,
25    pub updated_at: String,
26    pub arrays: BTreeMap<String, DataArrayMeta>,
27    pub attrs: BTreeMap<String, serde_json::Value>,
28    pub txn_sequence: u64,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct DataArrayMeta {
33    pub dtype: String,
34    pub shape: Vec<usize>,
35    pub chunk_shape: Vec<usize>,
36    #[serde(default = "default_array_order")]
37    pub order: String,
38    pub codec: String,
39    #[serde(default)]
40    pub chunk_index_path: Option<String>,
41    pub data_path: String,
42}
43
44fn default_array_order() -> String {
45    "column_major".to_string()
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct DataArrayPayload {
50    pub dtype: String,
51    pub shape: Vec<usize>,
52    pub values: Vec<f64>,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct DataChunkIndex {
57    pub schema_version: u32,
58    pub array: String,
59    pub chunks: Vec<DataChunkIndexEntry>,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct DataChunkIndexEntry {
64    pub key: String,
65    pub object_id: String,
66    pub hash: String,
67    pub bytes_raw: u64,
68    pub bytes_stored: u64,
69    #[serde(default)]
70    pub coords: Vec<usize>,
71    #[serde(default)]
72    pub shape: Vec<usize>,
73    pub data_path: String,
74}
75
76#[derive(Debug, Clone)]
77pub struct DataSchema {
78    pub arrays: BTreeMap<String, DataArrayMeta>,
79}
80
81#[derive(Debug, Clone)]
82pub struct PendingTxn {
83    pub dataset_path: String,
84    pub base_sequence: u64,
85    pub writes: Vec<PendingWrite>,
86    pub resizes: Vec<PendingResize>,
87    pub fills: Vec<PendingFill>,
88    pub create_arrays: Vec<PendingCreateArray>,
89    pub delete_arrays: Vec<String>,
90    pub attrs: BTreeMap<String, Value>,
91    pub status: TxnStatus,
92}
93
94#[derive(Debug, Clone)]
95pub struct PendingWrite {
96    pub array: String,
97    pub slice_spec: Option<Value>,
98    pub value: Value,
99}
100
101#[derive(Debug, Clone)]
102pub struct PendingResize {
103    pub array: String,
104    pub shape: Vec<usize>,
105}
106
107#[derive(Debug, Clone)]
108pub struct PendingFill {
109    pub array: String,
110    pub slice_spec: Option<Value>,
111    pub value: Value,
112}
113
114#[derive(Debug, Clone)]
115pub struct PendingCreateArray {
116    pub array: String,
117    pub meta: DataArrayMeta,
118}
119
120#[derive(Debug, Clone, PartialEq, Eq)]
121pub enum TxnStatus {
122    Open,
123    Committed,
124    Aborted,
125}
126
127thread_local! {
128    static FALLBACK_TX_REGISTRY: RefCell<HashMap<String, PendingTxn>> = RefCell::new(HashMap::new());
129}
130
131#[cfg(not(target_arch = "wasm32"))]
132tokio::task_local! {
133    static TASK_TX_REGISTRY: RefCell<HashMap<String, PendingTxn>>;
134}
135
136pub async fn with_tx_registry_scope<F>(future: F) -> F::Output
137where
138    F: Future,
139{
140    #[cfg(not(target_arch = "wasm32"))]
141    {
142        if TASK_TX_REGISTRY.try_with(|_| ()).is_ok() {
143            future.await
144        } else {
145            TASK_TX_REGISTRY
146                .scope(RefCell::new(HashMap::new()), future)
147                .await
148        }
149    }
150    #[cfg(target_arch = "wasm32")]
151    {
152        future.await
153    }
154}
155
156fn with_tx_registry<T>(f: impl FnOnce(&mut HashMap<String, PendingTxn>) -> T) -> BuiltinResult<T> {
157    #[cfg(not(target_arch = "wasm32"))]
158    {
159        if TASK_TX_REGISTRY.try_with(|_| ()).is_ok() {
160            return TASK_TX_REGISTRY.with(|registry| {
161                let mut registry = registry.try_borrow_mut().map_err(|_| {
162                    data_error("data transaction registry is already mutably borrowed")
163                })?;
164                Ok(f(&mut registry))
165            });
166        }
167    }
168
169    FALLBACK_TX_REGISTRY.with(|registry| {
170        let mut registry = registry
171            .try_borrow_mut()
172            .map_err(|_| data_error("data transaction registry is already mutably borrowed"))?;
173        Ok(f(&mut registry))
174    })
175}
176
177pub fn data_error(message: impl Into<String>) -> RuntimeError {
178    build_runtime_error(message)
179        .with_identifier("RUNMAT:Data:Error")
180        .with_builtin("data")
181        .build()
182}
183
184fn data_error_with_identifier(
185    message: impl Into<String>,
186    identifier: &'static str,
187) -> RuntimeError {
188    build_runtime_error(message)
189        .with_identifier(identifier)
190        .with_builtin("data")
191        .build()
192}
193
194const DATA_MANIFEST_CONFLICT_IDENTIFIER: &str = "RunMat:data:ManifestConflict";
195const DATA_TRANSACTION_NOT_FOUND_IDENTIFIER: &str = "RunMat:data:TransactionNotFound";
196
197pub fn parse_string(value: &Value, context: &str) -> BuiltinResult<String> {
198    match value {
199        Value::String(s) => Ok(s.clone()),
200        Value::CharArray(ca) => Ok(ca.to_string()),
201        _ => Err(data_error(format!("{context}: expected string value"))),
202    }
203}
204
205pub fn dataset_root(path: &str) -> PathBuf {
206    PathBuf::from(path)
207}
208
209pub fn manifest_path(root: &Path) -> PathBuf {
210    root.join("manifest.json")
211}
212
213pub fn arrays_root(root: &Path) -> PathBuf {
214    root.join("arrays")
215}
216
217pub async fn write_manifest_async(root: &Path, manifest: &DataManifest) -> BuiltinResult<()> {
218    fs::create_dir_all_async(root).await.map_err(|err| {
219        data_error(format!(
220            "failed to create dataset root '{}': {err}",
221            root.display()
222        ))
223    })?;
224    let path = manifest_path(root);
225    let bytes = serde_json::to_vec_pretty(manifest)
226        .map_err(|err| data_error(format!("failed to encode manifest json: {err}")))?;
227    fs::write_async(&path, &bytes).await.map_err(|err| {
228        data_error(format!(
229            "failed to write manifest '{}': {err}",
230            path.display()
231        ))
232    })?;
233    Ok(())
234}
235
236pub async fn read_manifest_async(root: &Path) -> BuiltinResult<DataManifest> {
237    let path = manifest_path(root);
238    let bytes = fs::read_async(&path).await.map_err(|err| {
239        data_error(format!(
240            "failed to read manifest '{}': {err}",
241            path.display()
242        ))
243    })?;
244    let manifest = serde_json::from_slice::<DataManifest>(&bytes).map_err(|err| {
245        data_error(format!(
246            "failed to parse manifest '{}': {err}",
247            path.display()
248        ))
249    })?;
250    Ok(manifest)
251}
252
253pub async fn write_array_payload_async(
254    root: &Path,
255    array: &str,
256    payload: &DataArrayPayload,
257    chunk_shape: &[usize],
258) -> BuiltinResult<(PathBuf, PathBuf)> {
259    let array_dir = arrays_root(root).join(array);
260    fs::create_dir_all_async(&array_dir).await.map_err(|err| {
261        data_error(format!(
262            "failed to create array dir '{}': {err}",
263            array_dir.display()
264        ))
265    })?;
266    let payload_path = array_dir.join("data.f64.json");
267    let bytes = serde_json::to_vec(payload)
268        .map_err(|err| data_error(format!("failed to encode array payload json: {err}")))?;
269    fs::write_async(&payload_path, &bytes)
270        .await
271        .map_err(|err| {
272            data_error(format!(
273                "failed to write payload '{}': {err}",
274                payload_path.display()
275            ))
276        })?;
277
278    let chunk_dir = array_dir.join("chunks");
279    fs::create_dir_all_async(&chunk_dir).await.map_err(|err| {
280        data_error(format!(
281            "failed to create chunk dir '{}': {err}",
282            chunk_dir.display()
283        ))
284    })?;
285
286    let mut index = DataChunkIndex {
287        schema_version: 1,
288        array: array.to_string(),
289        chunks: Vec::new(),
290    };
291    let mut upload_chunks = Vec::new();
292    let grid_shape = chunk_grid_shape(&payload.shape, chunk_shape);
293    let mut coords = vec![0usize; payload.shape.len()];
294    loop {
295        let chunk_start = chunk_start_for_coords(&coords, chunk_shape);
296        let chunk_extent = chunk_extent_for_start(&chunk_start, chunk_shape, &payload.shape);
297        let chunk_payload = DataArrayPayload {
298            dtype: payload.dtype.clone(),
299            shape: chunk_extent.clone(),
300            values: collect_chunk_values(payload, &chunk_start, &chunk_extent)?,
301        };
302        let key = chunk_key(&coords);
303        let object_id = format!("obj_{}", key.replace('.', "_"));
304        let chunk_bytes = serde_json::to_vec(&chunk_payload)
305            .map_err(|err| data_error(format!("failed to encode chunk payload: {err}")))?;
306        let data_path = chunk_dir.join(format!("{object_id}.json"));
307        fs::write_async(&data_path, &chunk_bytes)
308            .await
309            .map_err(|err| {
310                data_error(format!(
311                    "failed to write chunk '{}': {err}",
312                    data_path.display()
313                ))
314            })?;
315        let hash = sha256_hex(&chunk_bytes);
316        let rel_chunk_path = data_path
317            .strip_prefix(root)
318            .map_err(|err| data_error(format!("failed to compute chunk relative path: {err}")))?
319            .to_string_lossy()
320            .to_string();
321        index.chunks.push(DataChunkIndexEntry {
322            key: key.clone(),
323            object_id: object_id.clone(),
324            hash: hash.clone(),
325            bytes_raw: chunk_bytes.len() as u64,
326            bytes_stored: chunk_bytes.len() as u64,
327            coords: coords.clone(),
328            shape: chunk_extent,
329            data_path: rel_chunk_path,
330        });
331        upload_chunks.push((
332            DataChunkDescriptor {
333                key,
334                object_id,
335                hash,
336                bytes_raw: chunk_bytes.len() as u64,
337                bytes_stored: chunk_bytes.len() as u64,
338            },
339            chunk_bytes,
340        ));
341        if !advance_index(&mut coords, &grid_shape) {
342            break;
343        }
344    }
345
346    maybe_upload_chunks_async(root, array, upload_chunks).await?;
347
348    tracing::info!(
349        target: "runmat.data",
350        dataset = %root.display(),
351        array = array,
352        chunks = index.chunks.len(),
353        payload_bytes = bytes.len(),
354        "data chunk write planned"
355    );
356
357    let chunk_index_path = chunk_dir.join("index.json");
358    let chunk_index_bytes = serde_json::to_vec(&index)
359        .map_err(|err| data_error(format!("failed to encode chunk index json: {err}")))?;
360    fs::write_async(&chunk_index_path, &chunk_index_bytes)
361        .await
362        .map_err(|err| {
363            data_error(format!(
364                "failed to write chunk index '{}': {err}",
365                chunk_index_path.display()
366            ))
367        })?;
368    Ok((payload_path, chunk_index_path))
369}
370
371pub async fn read_array_payload_async(
372    root: &Path,
373    meta: &DataArrayMeta,
374) -> BuiltinResult<DataArrayPayload> {
375    if let Some(index_path) = &meta.chunk_index_path {
376        let path = root.join(index_path);
377        if fs::metadata_async(&path).await.is_ok() {
378            return read_array_payload_chunked_async(root, meta, &path).await;
379        }
380    }
381    let payload_path = root.join(&meta.data_path);
382    let bytes = fs::read_async(&payload_path).await.map_err(|err| {
383        data_error(format!(
384            "failed to read payload '{}': {err}",
385            payload_path.display()
386        ))
387    })?;
388    serde_json::from_slice::<DataArrayPayload>(&bytes).map_err(|err| {
389        data_error(format!(
390            "failed to parse payload '{}': {err}",
391            payload_path.display()
392        ))
393    })
394}
395
396pub async fn read_array_slice_payload_async(
397    root: &Path,
398    meta: &DataArrayMeta,
399    start: &[usize],
400    shape: &[usize],
401) -> BuiltinResult<DataArrayPayload> {
402    let (slice_start, slice_shape) = normalize_slice_bounds(&meta.shape, start, shape)?;
403    if let Some(index_path) = &meta.chunk_index_path {
404        let path = root.join(index_path);
405        if fs::metadata_async(&path).await.is_ok() {
406            return read_array_payload_chunked_slice_async(
407                root,
408                meta,
409                &path,
410                &slice_start,
411                &slice_shape,
412            )
413            .await;
414        }
415    }
416    let full = read_array_payload_async(root, meta).await?;
417    extract_slice_payload(&full, &slice_start, &slice_shape)
418}
419
420async fn read_array_payload_chunked_slice_async(
421    root: &Path,
422    meta: &DataArrayMeta,
423    index_path: &Path,
424    slice_start: &[usize],
425    slice_shape: &[usize],
426) -> BuiltinResult<DataArrayPayload> {
427    let bytes = fs::read_async(index_path).await.map_err(|err| {
428        data_error(format!(
429            "failed to read chunk index '{}': {err}",
430            index_path.display()
431        ))
432    })?;
433    let index: DataChunkIndex = serde_json::from_slice(&bytes).map_err(|err| {
434        data_error(format!(
435            "failed to parse chunk index '{}': {err}",
436            index_path.display()
437        ))
438    })?;
439
440    let mut values = vec![0.0; slice_shape.iter().copied().product::<usize>()];
441    for chunk in index.chunks {
442        let coords = chunk_coords_from_entry(&chunk, meta.shape.len())?;
443        let chunk_start = chunk_start_for_coords(&coords, &meta.chunk_shape);
444        let chunk_extent = if chunk.shape.is_empty() {
445            chunk_extent_for_start(&chunk_start, &meta.chunk_shape, &meta.shape)
446        } else {
447            chunk.shape.clone()
448        };
449        if !chunk_intersects_slice(&chunk_start, &chunk_extent, slice_start, slice_shape) {
450            continue;
451        }
452
453        let chunk_path = root.join(&chunk.data_path);
454        let bytes = fs::read_async(&chunk_path).await.map_err(|err| {
455            data_error(format!(
456                "failed to read chunk payload '{}': {err}",
457                chunk_path.display()
458            ))
459        })?;
460        let payload: DataArrayPayload = serde_json::from_slice(&bytes).map_err(|err| {
461            data_error(format!(
462                "failed to parse chunk payload '{}': {err}",
463                chunk_path.display()
464            ))
465        })?;
466        if payload.shape != chunk_extent {
467            return Err(data_error(format!(
468                "chunk payload shape mismatch for key '{}': {:?} != {:?}",
469                chunk.key, payload.shape, chunk_extent
470            )));
471        }
472
473        let mut local = vec![0usize; chunk_extent.len()];
474        loop {
475            let mut global = Vec::with_capacity(chunk_extent.len());
476            for dim in 0..chunk_extent.len() {
477                global.push(chunk_start[dim] + local[dim]);
478            }
479            if coordinate_in_slice(&global, slice_start, slice_shape) {
480                let src_linear = linear_index_column_major(&local, &chunk_extent)?;
481                let mut dst = Vec::with_capacity(slice_shape.len());
482                for dim in 0..slice_shape.len() {
483                    dst.push(global[dim].saturating_sub(slice_start[dim]));
484                }
485                let dst_linear = linear_index_column_major(&dst, slice_shape)?;
486                values[dst_linear] = payload.values[src_linear];
487            }
488            if !advance_index(&mut local, &chunk_extent) {
489                break;
490            }
491        }
492    }
493
494    Ok(DataArrayPayload {
495        dtype: meta.dtype.clone(),
496        shape: slice_shape.to_vec(),
497        values,
498    })
499}
500
501async fn read_array_payload_chunked_async(
502    root: &Path,
503    meta: &DataArrayMeta,
504    index_path: &Path,
505) -> BuiltinResult<DataArrayPayload> {
506    let bytes = fs::read_async(index_path).await.map_err(|err| {
507        data_error(format!(
508            "failed to read chunk index '{}': {err}",
509            index_path.display()
510        ))
511    })?;
512    let index: DataChunkIndex = serde_json::from_slice(&bytes).map_err(|err| {
513        data_error(format!(
514            "failed to parse chunk index '{}': {err}",
515            index_path.display()
516        ))
517    })?;
518    let mut values = vec![0.0; meta.shape.iter().copied().product::<usize>()];
519    for chunk in index.chunks {
520        let chunk_path = root.join(&chunk.data_path);
521        let bytes = fs::read_async(&chunk_path).await.map_err(|err| {
522            data_error(format!(
523                "failed to read chunk payload '{}': {err}",
524                chunk_path.display()
525            ))
526        })?;
527        let payload: DataArrayPayload = serde_json::from_slice(&bytes).map_err(|err| {
528            data_error(format!(
529                "failed to parse chunk payload '{}': {err}",
530                chunk_path.display()
531            ))
532        })?;
533        let coords = chunk_coords_from_entry(&chunk, meta.shape.len())?;
534        let chunk_start = chunk_start_for_coords(&coords, &meta.chunk_shape);
535        let chunk_extent = if chunk.shape.is_empty() {
536            chunk_extent_for_start(&chunk_start, &meta.chunk_shape, &meta.shape)
537        } else {
538            chunk.shape.clone()
539        };
540        if payload.shape != chunk_extent {
541            return Err(data_error(format!(
542                "chunk payload shape mismatch for key '{}': {:?} != {:?}",
543                chunk.key, payload.shape, chunk_extent
544            )));
545        }
546        let mut local = vec![0usize; chunk_extent.len()];
547        loop {
548            let mut global = Vec::with_capacity(chunk_extent.len());
549            for dim in 0..chunk_extent.len() {
550                global.push(chunk_start[dim] + local[dim]);
551            }
552            let src_linear = linear_index_column_major(&local, &chunk_extent)?;
553            let dst_linear = linear_index_column_major(&global, &meta.shape)?;
554            values[dst_linear] = payload.values[src_linear];
555            if !advance_index(&mut local, &chunk_extent) {
556                break;
557            }
558        }
559    }
560    Ok(DataArrayPayload {
561        dtype: meta.dtype.clone(),
562        shape: meta.shape.clone(),
563        values,
564    })
565}
566
567async fn maybe_upload_chunks_async(
568    root: &Path,
569    array: &str,
570    chunks: Vec<(DataChunkDescriptor, Vec<u8>)>,
571) -> BuiltinResult<()> {
572    if chunks.is_empty() {
573        return Ok(());
574    }
575    let request = DataChunkUploadRequest {
576        dataset_path: root.to_string_lossy().to_string(),
577        array: array.to_string(),
578        chunks: chunks.iter().map(|(desc, _)| desc.clone()).collect(),
579    };
580    let targets = match fs::data_chunk_upload_targets_async(&request).await {
581        Ok(targets) => targets,
582        Err(err) if err.kind() == std::io::ErrorKind::Unsupported => return Ok(()),
583        Err(err) => {
584            return Err(data_error(format!(
585                "failed to request data chunk upload targets: {err}"
586            )))
587        }
588    };
589    for (descriptor, bytes) in chunks {
590        let target = find_chunk_target(&targets, &descriptor.key)?;
591        fs::data_upload_chunk_async(target, &bytes)
592            .await
593            .map_err(|err| {
594                data_error(format!(
595                    "failed to upload chunk '{}': {err}",
596                    descriptor.key
597                ))
598            })?;
599        tracing::info!(
600            target: "runmat.data",
601            dataset = %root.display(),
602            array = array,
603            chunk_key = descriptor.key,
604            bytes = bytes.len(),
605            "data chunk uploaded"
606        );
607    }
608    Ok(())
609}
610
611fn find_chunk_target<'a>(
612    targets: &'a [DataChunkUploadTarget],
613    key: &str,
614) -> BuiltinResult<&'a DataChunkUploadTarget> {
615    targets
616        .iter()
617        .find(|target| target.key == key)
618        .ok_or_else(|| data_error(format!("missing upload target for chunk '{key}'")))
619}
620
621pub fn sha256_hex(bytes: &[u8]) -> String {
622    let mut hasher = Sha256::new();
623    hasher.update(bytes);
624    let digest = hasher.finalize();
625    format!("sha256:{:x}", digest)
626}
627
628fn chunk_key(coords: &[usize]) -> String {
629    coords
630        .iter()
631        .map(|v| v.to_string())
632        .collect::<Vec<_>>()
633        .join(".")
634}
635
636fn chunk_grid_shape(shape: &[usize], chunk_shape: &[usize]) -> Vec<usize> {
637    shape
638        .iter()
639        .enumerate()
640        .map(|(idx, extent)| {
641            let chunk = chunk_shape.get(idx).copied().unwrap_or(1).max(1);
642            extent.div_ceil(chunk)
643        })
644        .collect()
645}
646
647fn chunk_start_for_coords(coords: &[usize], chunk_shape: &[usize]) -> Vec<usize> {
648    coords
649        .iter()
650        .enumerate()
651        .map(|(idx, coord)| coord * chunk_shape.get(idx).copied().unwrap_or(1).max(1))
652        .collect()
653}
654
655fn chunk_extent_for_start(
656    start: &[usize],
657    chunk_shape: &[usize],
658    full_shape: &[usize],
659) -> Vec<usize> {
660    start
661        .iter()
662        .enumerate()
663        .map(|(idx, start)| {
664            let chunk = chunk_shape.get(idx).copied().unwrap_or(1).max(1);
665            let end = (*start + chunk).min(full_shape[idx]);
666            end.saturating_sub(*start)
667        })
668        .collect()
669}
670
671fn collect_chunk_values(
672    payload: &DataArrayPayload,
673    chunk_start: &[usize],
674    chunk_extent: &[usize],
675) -> BuiltinResult<Vec<f64>> {
676    let mut local = vec![0usize; chunk_extent.len()];
677    let mut values = Vec::with_capacity(chunk_extent.iter().copied().product());
678    loop {
679        let mut global = Vec::with_capacity(chunk_extent.len());
680        for dim in 0..chunk_extent.len() {
681            global.push(chunk_start[dim] + local[dim]);
682        }
683        let linear = linear_index_column_major(&global, &payload.shape)?;
684        values.push(payload.values[linear]);
685        if !advance_index(&mut local, chunk_extent) {
686            break;
687        }
688    }
689    Ok(values)
690}
691
692fn chunk_coords_from_entry(entry: &DataChunkIndexEntry, rank: usize) -> BuiltinResult<Vec<usize>> {
693    if !entry.coords.is_empty() {
694        if entry.coords.len() != rank {
695            return Err(data_error(format!(
696                "chunk coords rank mismatch for key '{}': expected {rank}, got {}",
697                entry.key,
698                entry.coords.len()
699            )));
700        }
701        return Ok(entry.coords.clone());
702    }
703    let coords = entry
704        .key
705        .split('.')
706        .map(|part| {
707            part.parse::<usize>()
708                .map_err(|_| data_error(format!("invalid chunk key '{}'", entry.key)))
709        })
710        .collect::<BuiltinResult<Vec<_>>>()?;
711    if coords.len() != rank {
712        return Err(data_error(format!(
713            "chunk key rank mismatch for key '{}': expected {rank}, got {}",
714            entry.key,
715            coords.len()
716        )));
717    }
718    Ok(coords)
719}
720
721fn normalize_slice_bounds(
722    full_shape: &[usize],
723    start: &[usize],
724    shape: &[usize],
725) -> BuiltinResult<(Vec<usize>, Vec<usize>)> {
726    if full_shape.is_empty() {
727        return Ok((Vec::new(), Vec::new()));
728    }
729    let mut normalized_start = Vec::with_capacity(full_shape.len());
730    let mut normalized_shape = Vec::with_capacity(full_shape.len());
731    for (axis, axis_len) in full_shape.iter().copied().enumerate() {
732        if axis_len == 0 {
733            return Err(data_error("slice axis length must be greater than zero"));
734        }
735        let requested_start = start.get(axis).copied().unwrap_or(0);
736        let clamped_start = requested_start.min(axis_len.saturating_sub(1));
737        let requested_span = shape.get(axis).copied().unwrap_or(axis_len);
738        let clamped_span = requested_span
739            .max(1)
740            .min(axis_len.saturating_sub(clamped_start));
741        normalized_start.push(clamped_start);
742        normalized_shape.push(clamped_span);
743    }
744    Ok((normalized_start, normalized_shape))
745}
746
747fn coordinate_in_slice(global: &[usize], slice_start: &[usize], slice_shape: &[usize]) -> bool {
748    for dim in 0..slice_shape.len() {
749        let start = slice_start[dim];
750        let end = start.saturating_add(slice_shape[dim]);
751        let value = global[dim];
752        if value < start || value >= end {
753            return false;
754        }
755    }
756    true
757}
758
759fn chunk_intersects_slice(
760    chunk_start: &[usize],
761    chunk_extent: &[usize],
762    slice_start: &[usize],
763    slice_shape: &[usize],
764) -> bool {
765    for dim in 0..slice_shape.len() {
766        let chunk_lo = chunk_start[dim];
767        let chunk_hi = chunk_lo.saturating_add(chunk_extent[dim]);
768        let slice_lo = slice_start[dim];
769        let slice_hi = slice_lo.saturating_add(slice_shape[dim]);
770        if chunk_hi <= slice_lo || slice_hi <= chunk_lo {
771            return false;
772        }
773    }
774    true
775}
776
777fn extract_slice_payload(
778    payload: &DataArrayPayload,
779    start: &[usize],
780    shape: &[usize],
781) -> BuiltinResult<DataArrayPayload> {
782    let mut values = Vec::with_capacity(shape.iter().copied().product());
783    if shape.is_empty() {
784        return Ok(DataArrayPayload {
785            dtype: payload.dtype.clone(),
786            shape: Vec::new(),
787            values,
788        });
789    }
790    let mut local = vec![0usize; shape.len()];
791    loop {
792        let mut global = Vec::with_capacity(shape.len());
793        for dim in 0..shape.len() {
794            global.push(start[dim] + local[dim]);
795        }
796        let linear = linear_index_column_major(&global, &payload.shape)?;
797        values.push(payload.values[linear]);
798        if !advance_index(&mut local, shape) {
799            break;
800        }
801    }
802    Ok(DataArrayPayload {
803        dtype: payload.dtype.clone(),
804        shape: shape.to_vec(),
805        values,
806    })
807}
808
809fn linear_index_column_major(index: &[usize], shape: &[usize]) -> BuiltinResult<usize> {
810    if index.len() != shape.len() {
811        return Err(data_error("chunk index rank mismatch"));
812    }
813    let mut stride = 1usize;
814    let mut linear = 0usize;
815    for (idx, extent) in index.iter().zip(shape.iter()) {
816        if *idx >= *extent {
817            return Err(data_error("chunk index out of bounds"));
818        }
819        linear += idx * stride;
820        stride = stride.saturating_mul(*extent);
821    }
822    Ok(linear)
823}
824
825fn advance_index(index: &mut [usize], shape: &[usize]) -> bool {
826    if shape.is_empty() {
827        return false;
828    }
829    for dim in 0..shape.len() {
830        index[dim] += 1;
831        if index[dim] < shape[dim] {
832            return true;
833        }
834        index[dim] = 0;
835    }
836    false
837}
838
839pub fn parse_schema(schema: &Value) -> BuiltinResult<DataSchema> {
840    let Value::Struct(schema_struct) = schema else {
841        return Err(data_error("data.create: schema must be a struct"));
842    };
843    let arrays_value = schema_struct
844        .fields
845        .get("arrays")
846        .ok_or_else(|| data_error("data.create: schema missing 'arrays' field"))?;
847    let Value::Struct(arrays_struct) = arrays_value else {
848        return Err(data_error("data.create: schema.arrays must be a struct"));
849    };
850
851    let mut arrays = BTreeMap::new();
852    for (name, meta_value) in &arrays_struct.fields {
853        let Value::Struct(meta_struct) = meta_value else {
854            return Err(data_error(format!(
855                "data.create: schema.arrays.{name} must be a struct"
856            )));
857        };
858        let dtype = meta_struct
859            .fields
860            .get("dtype")
861            .map(|v| parse_string(v, "data.create schema dtype"))
862            .transpose()?
863            .unwrap_or_else(|| "f64".to_string());
864        let shape = meta_struct
865            .fields
866            .get("shape")
867            .map(parse_usize_vector)
868            .transpose()?
869            .unwrap_or_else(|| vec![0, 0]);
870        let chunk_shape = meta_struct
871            .fields
872            .get("chunk")
873            .map(parse_usize_vector)
874            .transpose()?
875            .unwrap_or_else(|| default_chunk_shape(&shape));
876        let codec = meta_struct
877            .fields
878            .get("codec")
879            .map(|v| parse_string(v, "data.create schema codec"))
880            .transpose()?
881            .unwrap_or_else(|| "zstd".to_string());
882        let data_path = format!("arrays/{name}/data.f64.json");
883        let chunk_index_path = format!("arrays/{name}/chunks/index.json");
884        arrays.insert(
885            name.clone(),
886            DataArrayMeta {
887                dtype,
888                shape,
889                chunk_shape,
890                order: default_array_order(),
891                codec,
892                chunk_index_path: Some(chunk_index_path),
893                data_path,
894            },
895        );
896    }
897
898    Ok(DataSchema { arrays })
899}
900
901fn default_chunk_shape(shape: &[usize]) -> Vec<usize> {
902    if shape.is_empty() {
903        return vec![1024];
904    }
905    let mut out = shape.to_vec();
906    if out.len() == 1 {
907        out[0] = out[0].clamp(1, 65_536);
908        return out;
909    }
910    out[0] = out[0].clamp(1, 256);
911    out[1] = out[1].clamp(1, 256);
912    for dim in out.iter_mut().skip(2) {
913        *dim = (*dim).clamp(1, 8);
914    }
915    out
916}
917
918fn parse_usize_vector(value: &Value) -> BuiltinResult<Vec<usize>> {
919    match value {
920        Value::Tensor(t) => tensor_to_usize_vector(t),
921        Value::Num(n) => {
922            if *n < 0.0 || !n.is_finite() {
923                return Err(data_error(
924                    "data schema dimensions must be non-negative finite numbers",
925                ));
926            }
927            Ok(vec![*n as usize])
928        }
929        Value::Int(i) => {
930            let n = i.to_i64();
931            if n < 0 {
932                return Err(data_error("data schema dimensions must be non-negative"));
933            }
934            Ok(vec![n as usize])
935        }
936        _ => Err(data_error(
937            "data schema dimension field must be numeric tensor/vector",
938        )),
939    }
940}
941
942fn tensor_to_usize_vector(t: &Tensor) -> BuiltinResult<Vec<usize>> {
943    let mut out = Vec::with_capacity(t.data.len());
944    for value in &t.data {
945        if !value.is_finite() || *value < 0.0 {
946            return Err(data_error(
947                "data schema dimensions must be non-negative finite numbers",
948            ));
949        }
950        out.push(*value as usize);
951    }
952    Ok(out)
953}
954
955pub fn dataset_object(path: &str, manifest: &DataManifest) -> Value {
956    let mut obj = ObjectInstance::new("Dataset".to_string());
957    obj.properties
958        .insert("__data_path".to_string(), Value::String(path.to_string()));
959    obj.properties.insert(
960        "__data_id".to_string(),
961        Value::String(manifest.dataset_id.clone()),
962    );
963    obj.properties.insert(
964        "__data_version".to_string(),
965        Value::String(manifest_version_token(manifest)),
966    );
967    Value::Object(obj)
968}
969
970pub fn manifest_version_token(manifest: &DataManifest) -> String {
971    format!("{}:{}", manifest.updated_at, manifest.txn_sequence)
972}
973
974pub fn ensure_manifest_sequence(expected: u64, manifest: &DataManifest) -> BuiltinResult<()> {
975    if manifest.txn_sequence != expected {
976        tracing::warn!(
977            target: "runmat.data",
978            expected_sequence = expected,
979            actual_sequence = manifest.txn_sequence,
980            "manifest conflict detected"
981        );
982        return Err(data_error_with_identifier(
983            "MANIFEST_CONFLICT: dataset changed since transaction begin",
984            DATA_MANIFEST_CONFLICT_IDENTIFIER,
985        ));
986    }
987    Ok(())
988}
989
990pub fn array_object(dataset_path: &str, array_name: &str) -> Value {
991    let mut obj = ObjectInstance::new("DataArray".to_string());
992    obj.properties.insert(
993        "__data_path".to_string(),
994        Value::String(dataset_path.to_string()),
995    );
996    obj.properties.insert(
997        "__array_name".to_string(),
998        Value::String(array_name.to_string()),
999    );
1000    Value::Object(obj)
1001}
1002
1003pub fn transaction_object(dataset_path: &str, tx_id: &str) -> Value {
1004    let mut obj = ObjectInstance::new("DataTransaction".to_string());
1005    obj.properties.insert(
1006        "__data_path".to_string(),
1007        Value::String(dataset_path.to_string()),
1008    );
1009    obj.properties
1010        .insert("__tx_id".to_string(), Value::String(tx_id.to_string()));
1011    Value::Object(obj)
1012}
1013
1014pub fn get_object_prop<'a>(obj: &'a ObjectInstance, key: &str) -> BuiltinResult<&'a Value> {
1015    obj.properties
1016        .get(key)
1017        .ok_or_else(|| data_error(format!("object missing internal property '{key}'")))
1018}
1019
1020pub fn now_rfc3339() -> String {
1021    Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true)
1022}
1023
1024pub fn new_dataset_id() -> String {
1025    static NEXT_DATASET_ID: AtomicU64 = AtomicU64::new(1);
1026    let seq = NEXT_DATASET_ID.fetch_add(1, Ordering::Relaxed);
1027    format!("ds_{}_{}", Utc::now().timestamp_millis(), seq)
1028}
1029
1030pub fn new_tx_id() -> String {
1031    static NEXT_TX_ID: AtomicU64 = AtomicU64::new(1);
1032    let seq = NEXT_TX_ID.fetch_add(1, Ordering::Relaxed);
1033    format!("tx_{}_{}", Utc::now().timestamp_millis(), seq)
1034}
1035
1036pub fn start_tx(dataset_path: String, base_sequence: u64) -> BuiltinResult<String> {
1037    let tx_id = new_tx_id();
1038    let pending = PendingTxn {
1039        dataset_path,
1040        base_sequence,
1041        writes: Vec::new(),
1042        resizes: Vec::new(),
1043        fills: Vec::new(),
1044        create_arrays: Vec::new(),
1045        delete_arrays: Vec::new(),
1046        attrs: BTreeMap::new(),
1047        status: TxnStatus::Open,
1048    };
1049    with_tx_registry(|registry| {
1050        registry.insert(tx_id.clone(), pending);
1051    })?;
1052    Ok(tx_id)
1053}
1054
1055pub fn with_tx_mut<T>(
1056    tx_id: &str,
1057    f: impl FnOnce(&mut PendingTxn) -> BuiltinResult<T>,
1058) -> BuiltinResult<T> {
1059    with_tx_registry(|registry| {
1060        let tx = registry.get_mut(tx_id).ok_or_else(|| {
1061            data_error_with_identifier(
1062                format!("transaction '{tx_id}' not found"),
1063                DATA_TRANSACTION_NOT_FOUND_IDENTIFIER,
1064            )
1065        })?;
1066        f(tx)
1067    })?
1068}
1069
1070pub fn with_tx<T>(
1071    tx_id: &str,
1072    f: impl FnOnce(&PendingTxn) -> BuiltinResult<T>,
1073) -> BuiltinResult<T> {
1074    #[cfg(not(target_arch = "wasm32"))]
1075    {
1076        if TASK_TX_REGISTRY.try_with(|_| ()).is_ok() {
1077            return TASK_TX_REGISTRY.with(|registry| {
1078                let registry = registry
1079                    .try_borrow()
1080                    .map_err(|_| data_error("data transaction registry is already borrowed"))?;
1081                let tx = registry.get(tx_id).ok_or_else(|| {
1082                    data_error_with_identifier(
1083                        format!("transaction '{tx_id}' not found"),
1084                        DATA_TRANSACTION_NOT_FOUND_IDENTIFIER,
1085                    )
1086                })?;
1087                f(tx)
1088            });
1089        }
1090    }
1091
1092    FALLBACK_TX_REGISTRY.with(|registry| {
1093        let registry = registry
1094            .try_borrow()
1095            .map_err(|_| data_error("data transaction registry is already borrowed"))?;
1096        let tx = registry.get(tx_id).ok_or_else(|| {
1097            data_error_with_identifier(
1098                format!("transaction '{tx_id}' not found"),
1099                DATA_TRANSACTION_NOT_FOUND_IDENTIFIER,
1100            )
1101        })?;
1102        f(tx)
1103    })
1104}
1105
1106pub fn remove_tx(tx_id: &str) -> BuiltinResult<()> {
1107    with_tx_registry(|registry| {
1108        let _ = registry.remove(tx_id);
1109    })
1110}
1111
1112#[cfg(test)]
1113mod tests {
1114    use super::*;
1115
1116    #[test]
1117    fn ensure_manifest_sequence_accepts_matching_sequence() {
1118        let manifest = DataManifest {
1119            schema_version: 1,
1120            format: "runmat-data".to_string(),
1121            dataset_id: "ds_test".to_string(),
1122            name: Some("test".to_string()),
1123            created_at: "2026-03-01T00:00:00Z".to_string(),
1124            updated_at: "2026-03-01T00:00:00Z".to_string(),
1125            arrays: BTreeMap::new(),
1126            attrs: BTreeMap::new(),
1127            txn_sequence: 5,
1128        };
1129        ensure_manifest_sequence(5, &manifest).expect("expected sequence match");
1130    }
1131
1132    #[test]
1133    fn ensure_manifest_sequence_rejects_conflict() {
1134        let manifest = DataManifest {
1135            schema_version: 1,
1136            format: "runmat-data".to_string(),
1137            dataset_id: "ds_test".to_string(),
1138            name: Some("test".to_string()),
1139            created_at: "2026-03-01T00:00:00Z".to_string(),
1140            updated_at: "2026-03-01T00:00:00Z".to_string(),
1141            arrays: BTreeMap::new(),
1142            attrs: BTreeMap::new(),
1143            txn_sequence: 6,
1144        };
1145        let err = ensure_manifest_sequence(5, &manifest).expect_err("expected conflict error");
1146        assert_eq!(
1147            err.identifier(),
1148            Some(DATA_MANIFEST_CONFLICT_IDENTIFIER),
1149            "manifest conflicts should expose a stable identifier"
1150        );
1151    }
1152
1153    #[test]
1154    fn transaction_registry_roundtrip() {
1155        let tx_id = start_tx("/datasets/test.data".to_string(), 7).expect("start tx");
1156        let status = with_tx(&tx_id, |tx| Ok(tx.status.clone())).expect("tx lookup");
1157        assert_eq!(status, TxnStatus::Open);
1158        remove_tx(&tx_id).expect("remove tx");
1159        let err = with_tx(&tx_id, |_| Ok(())).expect_err("expected missing tx");
1160        assert_eq!(
1161            err.identifier(),
1162            Some(DATA_TRANSACTION_NOT_FOUND_IDENTIFIER),
1163            "missing transaction lookups should expose a stable identifier"
1164        );
1165    }
1166
1167    #[cfg(not(target_arch = "wasm32"))]
1168    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1169    async fn transaction_registry_scope_survives_await() {
1170        with_tx_registry_scope(async {
1171            let tx_id = start_tx("/datasets/task-local.data".to_string(), 11).expect("start tx");
1172            tokio::task::yield_now().await;
1173            let status = with_tx(&tx_id, |tx| Ok(tx.status.clone())).expect("tx lookup");
1174            assert_eq!(status, TxnStatus::Open);
1175            remove_tx(&tx_id).expect("remove tx");
1176            let err = with_tx(&tx_id, |_| Ok(())).expect_err("expected missing tx");
1177            assert_eq!(
1178                err.identifier(),
1179                Some(DATA_TRANSACTION_NOT_FOUND_IDENTIFIER)
1180            );
1181        })
1182        .await;
1183    }
1184
1185    #[test]
1186    fn sha256_hash_format_matches_expected_prefix() {
1187        let hash = sha256_hex(b"runmat");
1188        assert!(hash.starts_with("sha256:"));
1189        assert_eq!(hash.len(), "sha256:".len() + 64);
1190    }
1191}