use std::alloc::{GlobalAlloc, Layout, System};
use std::sync::atomic::{AtomicUsize, Ordering};
struct TrackingAllocator;
static ALLOCATED: AtomicUsize = AtomicUsize::new(0);
static PEAK: AtomicUsize = AtomicUsize::new(0);
unsafe impl GlobalAlloc for TrackingAllocator {
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
let ptr = unsafe { System.alloc(layout) };
if !ptr.is_null() {
let current = ALLOCATED.fetch_add(layout.size(), Ordering::Relaxed) + layout.size();
PEAK.fetch_max(current, Ordering::Relaxed);
}
ptr
}
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
unsafe { System.dealloc(ptr, layout) };
ALLOCATED.fetch_sub(layout.size(), Ordering::Relaxed);
}
}
#[global_allocator]
static GLOBAL: TrackingAllocator = TrackingAllocator;
fn current_allocated() -> usize {
ALLOCATED.load(Ordering::Relaxed)
}
fn reset_peak() {
PEAK.store(ALLOCATED.load(Ordering::Relaxed), Ordering::Relaxed);
}
fn peak_allocated() -> usize {
PEAK.load(Ordering::Relaxed)
}
use somatize_core::cache::CacheKey;
use somatize_core::error::Result;
use somatize_core::filter::{Distribution, Filter, FilterKind, FilterMeta, StreamMode};
use somatize_core::value::Value;
use std::sync::Arc;
struct Doubler;
impl Filter for Doubler {
fn config_hash(&self) -> CacheKey {
CacheKey::from_parts(&[b"Doubler"])
}
fn fit(&self, _x: &Value, _y: Option<&Value>) -> Result<Value> {
Ok(Value::Empty)
}
fn forward(&self, x: &Value, _state: &Value) -> Result<Value> {
match x {
Value::Tensor { values, shape } => Ok(Value::tensor(
values.iter().map(|v| v * 2.0).collect(),
shape.clone(),
)),
_ => Ok(x.clone()),
}
}
fn meta(&self) -> FilterMeta {
FilterMeta {
name: "Doubler".into(),
kind: FilterKind::Stateless,
cacheable: false,
differentiable: false,
stream_mode: StreamMode::FixedState,
distribution: Distribution::Local,
input_schema: None,
output_schema: None,
}
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
struct MeanNormalizer;
impl Filter for MeanNormalizer {
fn config_hash(&self) -> CacheKey {
CacheKey::from_parts(&[b"MeanNorm"])
}
fn fit(&self, x: &Value, _y: Option<&Value>) -> Result<Value> {
if let Some((data, _)) = x.as_tensor() {
let mean = data.iter().sum::<f64>() / data.len().max(1) as f64;
Ok(Value::json(serde_json::json!({ "mean": mean })))
} else {
Ok(Value::Empty)
}
}
fn forward(&self, x: &Value, state: &Value) -> Result<Value> {
let mean = state
.as_json()
.and_then(|j| j["mean"].as_f64())
.unwrap_or(0.0);
match x {
Value::Tensor { values, shape } => Ok(Value::tensor(
values.iter().map(|v| v - mean).collect(),
shape.clone(),
)),
_ => Ok(x.clone()),
}
}
fn meta(&self) -> FilterMeta {
FilterMeta {
name: "MeanNorm".into(),
kind: FilterKind::Trainable,
cacheable: false,
differentiable: false,
stream_mode: StreamMode::FixedState,
distribution: Distribution::Local,
input_schema: None,
output_schema: None,
}
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
use somatize_core::graph::{Graph, Node};
use somatize_runtime::cache::MemoryCache;
use somatize_runtime::filter_library::FilterLibrary;
use somatize_runtime::graph_session::GraphSession;
fn make_doubler_session() -> GraphSession {
let mut graph = Graph::new();
graph.nodes.push(Node::new("double", "Double", "double"));
let mut lib = FilterLibrary::new();
lib.register("double", Box::new(Doubler));
let cache = Arc::new(MemoryCache::new(64 * 1024 * 1024)); GraphSession::new(graph, lib).with_cache(cache)
}
fn make_pipeline_session() -> GraphSession {
use somatize_core::graph::Edge;
let mut graph = Graph::new();
graph.nodes.push(Node::new("norm", "Normalize", "norm"));
graph.nodes.push(Node::new("double", "Double", "double"));
graph
.edges
.push(Edge::data("norm_to_double", "norm", "double"));
let mut lib = FilterLibrary::new();
lib.register("norm", Box::new(MeanNormalizer));
lib.register("double", Box::new(Doubler));
let cache = Arc::new(MemoryCache::new(64 * 1024 * 1024));
GraphSession::new(graph, lib).with_cache(cache)
}
#[test]
fn stream_memory_does_not_grow_with_chunks() {
use somatize_runtime::forward::Stream;
let session = make_doubler_session();
let chunk_rows = 1000;
let n_chunks = 50;
let total_rows = chunk_rows * n_chunks;
let input = Value::tensor(
(0..total_rows).map(|i| i as f64).collect(),
vec![total_rows],
);
let small = Value::tensor(vec![1.0; chunk_rows], vec![chunk_rows]);
let _ = session.forward_with(
&small,
&Stream {
chunk_size: chunk_rows,
},
);
reset_peak();
let before = current_allocated();
let result = session.forward_with(
&input,
&Stream {
chunk_size: chunk_rows,
},
);
assert!(result.is_ok());
let after = current_allocated();
let peak = peak_allocated();
let expected_output_bytes = total_rows * std::mem::size_of::<f64>();
let growth = after.saturating_sub(before);
eprintln!(
"Stream: before={before}B, after={after}B, peak={peak}B, growth={growth}B, expected_output={expected_output_bytes}B"
);
assert!(
growth < expected_output_bytes * 8,
"Memory grew {growth}B after streaming {n_chunks} chunks — possible leak \
(expected < {}B)",
expected_output_bytes * 8
);
}
#[test]
fn repeated_forward_memory_does_not_grow() {
let session = make_doubler_session();
let batch_size = 1000;
let n_batches = 100;
let warm = Value::tensor(vec![1.0; batch_size], vec![batch_size]);
let _ = session.forward(&warm);
reset_peak();
let before = current_allocated();
for i in 0..n_batches {
let batch = Value::tensor(
(0..batch_size)
.map(|j| (i * batch_size + j) as f64)
.collect(),
vec![batch_size],
);
let result = session.forward(&batch);
assert!(result.is_ok());
}
let after = current_allocated();
let peak = peak_allocated();
let single_batch_bytes = batch_size * std::mem::size_of::<f64>();
let growth = after.saturating_sub(before);
eprintln!(
"Batched: before={before}B, after={after}B, peak={peak}B, growth={growth}B, single_batch={single_batch_bytes}B"
);
assert!(
growth < single_batch_bytes * 10,
"Memory grew {growth}B after {n_batches} forward() calls — possible leak \
(expected < {}B)",
single_batch_bytes * 10
);
}
#[test]
fn stream_peak_memory_bounded() {
use somatize_runtime::forward::Stream;
let session = make_doubler_session();
let chunk_rows = 500;
let total_rows = 50_000;
let input = Value::tensor(
(0..total_rows).map(|i| i as f64).collect(),
vec![total_rows],
);
let input_bytes = total_rows * std::mem::size_of::<f64>();
let small = Value::tensor(vec![1.0; chunk_rows], vec![chunk_rows]);
let _ = session.forward_with(
&small,
&Stream {
chunk_size: chunk_rows,
},
);
reset_peak();
let before = current_allocated();
let result = session.forward_with(
&input,
&Stream {
chunk_size: chunk_rows,
},
);
assert!(result.is_ok());
let peak = peak_allocated();
let peak_growth = peak.saturating_sub(before);
eprintln!("Peak: before={before}B, peak={peak}B, growth={peak_growth}B, input={input_bytes}B");
assert!(
peak_growth < input_bytes * 8,
"Peak memory {peak_growth}B during streaming is too high — may accumulate \
extra data (expected < {}B)",
input_bytes * 8
);
}
#[test]
fn pipeline_fit_then_repeated_forward_stable() {
let mut session = make_pipeline_session();
let train_data = Value::tensor((0..5000).map(|i| i as f64).collect(), vec![5000]);
let fit_result = session.fit(&train_data, None);
assert!(fit_result.is_ok());
let warm = Value::tensor(vec![1.0; 1000], vec![1000]);
let _ = session.forward(&warm);
reset_peak();
let before = current_allocated();
let n_passes = 50;
let batch_size = 1000;
for i in 0..n_passes {
let batch = Value::tensor(
(0..batch_size)
.map(|j| (i * batch_size + j) as f64)
.collect(),
vec![batch_size],
);
let result = session.forward(&batch);
assert!(result.is_ok());
}
let after = current_allocated();
let single_batch_bytes = batch_size * std::mem::size_of::<f64>();
let growth = after.saturating_sub(before);
eprintln!("Pipeline: before={before}B, after={after}B, growth={growth}B");
assert!(
growth < single_batch_bytes * 15,
"Pipeline memory grew {growth}B after {n_passes} forward passes — possible leak \
(expected < {}B)",
single_batch_bytes * 15
);
}
#[test]
fn value_clone_is_cheap() {
let big = Value::tensor(vec![42.0; 100_000], vec![100_000]);
reset_peak();
let before = current_allocated();
let mut clones = Vec::with_capacity(100);
for _ in 0..100 {
clones.push(big.clone());
}
let after = current_allocated();
let growth = after.saturating_sub(before);
let data_size = 100_000 * std::mem::size_of::<f64>();
eprintln!("Clone: before={before}B, after={after}B, growth={growth}B, data_size={data_size}B");
assert!(
growth < data_size / 4, "Cloning Value allocated {growth}B — expected near-zero \
(Arc clone). data_size={data_size}B"
);
drop(clones);
}