use alloc::collections::BTreeMap;
use alloc::string::{String, ToString};
use alloc::vec;
use alloc::vec::Vec;
use lazy_static::lazy_static;
use spin::Mutex;
use super::types::*;
use crate::storage::zpl::ZPL;
lazy_static! {
static ref STREAM_REGISTRY: Mutex<BTreeMap<(u64, String), StreamData>> =
Mutex::new(BTreeMap::new());
}
#[derive(Clone)]
struct StreamData {
info: StreamInfo,
data: Vec<u8>,
}
pub fn list_streams(dataset: &str, object_id: u64) -> Result<Vec<StreamInfo>, StreamError> {
let _ = dataset;
let mut streams = Vec::new();
{
let zpl = ZPL.lock();
if let Some(znode) = zpl.get_znode(object_id) {
let primary_size = znode.phys.size;
let allocated = znode
.data_cache
.as_ref()
.map(|d| d.len() as u64)
.unwrap_or(primary_size);
streams.push(StreamInfo {
name: String::new(),
stream_type: StreamType::Data,
size: primary_size,
allocated_size: allocated,
created: znode.phys.crtime[0], modified: znode.phys.mtime[0], is_primary: true,
attributes: StreamAttributes::default(),
});
} else {
return Err(StreamError::FileNotFound(object_id));
}
}
{
let registry = STREAM_REGISTRY.lock();
for ((oid, name), stream_data) in registry.iter() {
if *oid == object_id && !name.is_empty() {
streams.push(stream_data.info.clone());
}
}
}
Ok(streams)
}
pub fn stream_exists(
dataset: &str,
object_id: u64,
stream_name: &str,
) -> Result<bool, StreamError> {
let _ = dataset;
if stream_name.is_empty() {
let zpl = ZPL.lock();
return Ok(zpl.get_znode(object_id).is_some());
}
let registry = STREAM_REGISTRY.lock();
Ok(registry.contains_key(&(object_id, stream_name.to_string())))
}
pub fn get_stream_info(
dataset: &str,
object_id: u64,
stream_name: &str,
) -> Result<StreamInfo, StreamError> {
let _ = dataset;
if stream_name.is_empty() {
let zpl = ZPL.lock();
let znode = zpl
.get_znode(object_id)
.ok_or(StreamError::FileNotFound(object_id))?;
return Ok(StreamInfo {
name: String::new(),
stream_type: StreamType::Data,
size: znode.phys.size,
allocated_size: znode
.data_cache
.as_ref()
.map(|d| d.len() as u64)
.unwrap_or(0),
created: znode.phys.crtime[0], modified: znode.phys.mtime[0], is_primary: true,
attributes: StreamAttributes::default(),
});
}
let registry = STREAM_REGISTRY.lock();
registry
.get(&(object_id, stream_name.to_string()))
.map(|sd| sd.info.clone())
.ok_or_else(|| StreamError::StreamNotFound(stream_name.to_string()))
}
pub fn create_stream(
dataset: &str,
object_id: u64,
stream_name: &str,
stream_type: StreamType,
) -> Result<(), StreamError> {
let _ = dataset;
validate_stream_name(stream_name)?;
if stream_name.is_empty() {
return Err(StreamError::StreamExists(String::new()));
}
{
let zpl = ZPL.lock();
if zpl.get_znode(object_id).is_none() {
return Err(StreamError::FileNotFound(object_id));
}
}
let mut registry = STREAM_REGISTRY.lock();
let key = (object_id, stream_name.to_string());
if registry.contains_key(&key) {
return Err(StreamError::StreamExists(stream_name.to_string()));
}
let stream_count = registry.keys().filter(|(oid, _)| *oid == object_id).count();
if stream_count >= MAX_STREAMS_PER_FILE as usize {
return Err(StreamError::TooManyStreams(MAX_STREAMS_PER_FILE));
}
let now = crate::time::now();
let info = StreamInfo {
name: stream_name.to_string(),
stream_type,
size: 0,
allocated_size: 0,
created: now,
modified: now,
is_primary: false,
attributes: StreamAttributes::default(),
};
registry.insert(
key,
StreamData {
info,
data: Vec::new(),
},
);
crate::lcpfs_println!(
"[ STREAMS ] Created stream '{}' for object {}",
stream_name,
object_id
);
Ok(())
}
pub fn delete_stream(dataset: &str, object_id: u64, stream_name: &str) -> Result<(), StreamError> {
let _ = dataset;
if stream_name.is_empty() {
return Err(StreamError::CannotDeletePrimary);
}
let mut registry = STREAM_REGISTRY.lock();
let key = (object_id, stream_name.to_string());
if registry.remove(&key).is_none() {
return Err(StreamError::StreamNotFound(stream_name.to_string()));
}
crate::lcpfs_println!(
"[ STREAMS ] Deleted stream '{}' from object {}",
stream_name,
object_id
);
Ok(())
}
pub fn read_stream(
dataset: &str,
object_id: u64,
stream_name: &str,
offset: u64,
length: usize,
) -> Result<Vec<u8>, StreamError> {
let _ = dataset;
if stream_name.is_empty() {
let zpl = ZPL.lock();
let znode = zpl
.get_znode(object_id)
.ok_or(StreamError::FileNotFound(object_id))?;
if let Some(ref data) = znode.data_cache {
let start = offset as usize;
if start >= data.len() {
return Ok(Vec::new());
}
let end = (start + length).min(data.len());
return Ok(data[start..end].to_vec());
}
return Ok(Vec::new());
}
let registry = STREAM_REGISTRY.lock();
let key = (object_id, stream_name.to_string());
let stream_data = registry
.get(&key)
.ok_or_else(|| StreamError::StreamNotFound(stream_name.to_string()))?;
let start = offset as usize;
if start >= stream_data.data.len() {
return Ok(Vec::new());
}
let end = (start + length).min(stream_data.data.len());
Ok(stream_data.data[start..end].to_vec())
}
pub fn write_stream(
dataset: &str,
object_id: u64,
stream_name: &str,
offset: u64,
data: &[u8],
) -> Result<usize, StreamError> {
let _ = dataset;
if stream_name.is_empty() {
let mut zpl = ZPL.lock();
let znode = zpl
.get_znode_mut(object_id)
.ok_or(StreamError::FileNotFound(object_id))?;
let start = offset as usize;
let end = start + data.len();
if znode.data_cache.is_none() {
znode.data_cache = Some(Vec::new());
}
if let Some(ref mut cache) = znode.data_cache {
if cache.len() < end {
cache.resize(end, 0);
}
cache[start..end].copy_from_slice(data);
znode.phys.size = cache.len() as u64;
znode.phys.mtime[0] = crate::time::now();
znode.dirty = true;
}
return Ok(data.len());
}
let mut registry = STREAM_REGISTRY.lock();
let key = (object_id, stream_name.to_string());
let stream_data = registry
.get_mut(&key)
.ok_or_else(|| StreamError::StreamNotFound(stream_name.to_string()))?;
let start = offset as usize;
let end = start + data.len();
if stream_data.data.len() < end {
stream_data.data.resize(end, 0);
}
stream_data.data[start..end].copy_from_slice(data);
stream_data.info.size = stream_data.data.len() as u64;
stream_data.info.allocated_size = stream_data.info.size;
stream_data.info.modified = crate::time::now();
Ok(data.len())
}
pub fn truncate_stream(
dataset: &str,
object_id: u64,
stream_name: &str,
length: u64,
) -> Result<(), StreamError> {
let _ = dataset;
if stream_name.is_empty() {
let mut zpl = ZPL.lock();
let znode = zpl
.get_znode_mut(object_id)
.ok_or(StreamError::FileNotFound(object_id))?;
if let Some(ref mut cache) = znode.data_cache {
cache.truncate(length as usize);
}
znode.phys.size = length;
znode.phys.mtime[0] = crate::time::now();
znode.dirty = true;
return Ok(());
}
let mut registry = STREAM_REGISTRY.lock();
let key = (object_id, stream_name.to_string());
let stream_data = registry
.get_mut(&key)
.ok_or_else(|| StreamError::StreamNotFound(stream_name.to_string()))?;
stream_data.data.truncate(length as usize);
stream_data.info.size = stream_data.data.len() as u64;
stream_data.info.allocated_size = stream_data.info.size;
stream_data.info.modified = crate::time::now();
Ok(())
}
pub fn rename_stream(
dataset: &str,
object_id: u64,
old_name: &str,
new_name: &str,
) -> Result<(), StreamError> {
let _ = dataset;
if old_name.is_empty() {
return Err(StreamError::CannotDeletePrimary);
}
validate_stream_name(new_name)?;
let mut registry = STREAM_REGISTRY.lock();
let old_key = (object_id, old_name.to_string());
let new_key = (object_id, new_name.to_string());
if !registry.contains_key(&old_key) {
return Err(StreamError::StreamNotFound(old_name.to_string()));
}
if registry.contains_key(&new_key) {
return Err(StreamError::StreamExists(new_name.to_string()));
}
if let Some(mut stream_data) = registry.remove(&old_key) {
stream_data.info.name = new_name.to_string();
stream_data.info.modified = crate::time::now();
registry.insert(new_key, stream_data);
}
Ok(())
}
pub fn copy_stream(
src_dataset: &str,
src_id: u64,
src_stream: &str,
dst_dataset: &str,
dst_id: u64,
dst_stream: &str,
) -> Result<u64, StreamError> {
let src_info = get_stream_info(src_dataset, src_id, src_stream)?;
let src_data = read_stream(src_dataset, src_id, src_stream, 0, src_info.size as usize)?;
if !dst_stream.is_empty() && !stream_exists(dst_dataset, dst_id, dst_stream)? {
create_stream(dst_dataset, dst_id, dst_stream, src_info.stream_type)?;
}
let written = write_stream(dst_dataset, dst_id, dst_stream, 0, &src_data)?;
Ok(written as u64)
}
pub fn get_total_stream_size(dataset: &str, object_id: u64) -> Result<u64, StreamError> {
let streams = list_streams(dataset, object_id)?;
Ok(streams.iter().map(|s| s.size).sum())
}
pub fn cleanup_file_streams(object_id: u64) {
let mut registry = STREAM_REGISTRY.lock();
let keys_to_remove: Vec<_> = registry
.keys()
.filter(|(oid, _)| *oid == object_id)
.cloned()
.collect();
for key in keys_to_remove {
registry.remove(&key);
}
}
pub fn parse_stream_path(path: &str) -> ParsedStreamPath {
let (file_path, stream_part) = if path.len() > 2 && path.as_bytes()[1] == b':' {
let rest = &path[2..];
if let Some(colon_pos) = rest.find(':') {
let file = &path[..colon_pos + 2];
let stream = &rest[colon_pos + 1..];
(file, Some(stream))
} else {
(path, None)
}
} else if let Some(colon_pos) = path.find(':') {
(&path[..colon_pos], Some(&path[colon_pos + 1..]))
} else {
(path, None)
};
let (stream_name, stream_type) = if let Some(stream) = stream_part {
if let Some(type_pos) = stream.rfind(':') {
let name = &stream[..type_pos];
let type_str = &stream[type_pos + 1..];
let stype = match type_str.to_uppercase().as_str() {
"$DATA" => Some(StreamType::Data),
"$EA" => Some(StreamType::ExtendedAttribute),
_ => None,
};
(Some(name.to_string()), stype)
} else {
(Some(stream.to_string()), None)
}
} else {
(None, None)
};
ParsedStreamPath {
file_path: file_path.to_string(),
stream_name,
stream_type,
}
}
pub fn build_stream_path(file_path: &str, stream_name: &str) -> String {
if stream_name.is_empty() {
file_path.to_string()
} else {
alloc::format!("{}:{}", file_path, stream_name)
}
}
pub fn copy_all_streams(
src_dataset: &str,
src_id: u64,
dst_dataset: &str,
dst_id: u64,
include_primary: bool,
) -> Result<u64, StreamError> {
let streams = list_streams(src_dataset, src_id)?;
let mut total_copied = 0u64;
for stream in streams {
if stream.is_primary && !include_primary {
continue;
}
let copied = copy_stream(
src_dataset,
src_id,
&stream.name,
dst_dataset,
dst_id,
&stream.name,
)?;
total_copied += copied;
}
Ok(total_copied)
}
pub fn delete_all_streams(dataset: &str, object_id: u64) -> Result<u64, StreamError> {
let streams = list_streams(dataset, object_id)?;
let mut deleted = 0u64;
for stream in streams {
if !stream.is_primary {
delete_stream(dataset, object_id, &stream.name)?;
deleted += 1;
}
}
Ok(deleted)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_stream_path_simple() {
let parsed = parse_stream_path("/data/file.txt:metadata");
assert_eq!(parsed.file_path, "/data/file.txt");
assert_eq!(parsed.stream_name.as_deref(), Some("metadata"));
}
#[test]
fn test_parse_stream_path_no_stream() {
let parsed = parse_stream_path("/data/file.txt");
assert_eq!(parsed.file_path, "/data/file.txt");
assert!(parsed.stream_name.is_none());
}
#[test]
fn test_parse_stream_path_windows() {
let parsed = parse_stream_path("C:\\data\\file.txt:metadata");
assert_eq!(parsed.file_path, "C:\\data\\file.txt");
assert_eq!(parsed.stream_name.as_deref(), Some("metadata"));
}
#[test]
fn test_build_stream_path() {
assert_eq!(
build_stream_path("/data/file.txt", "metadata"),
"/data/file.txt:metadata"
);
assert_eq!(build_stream_path("/data/file.txt", ""), "/data/file.txt");
}
#[test]
fn test_list_streams() {
let zpl = crate::storage::zpl::ZPL.lock();
let root_id = zpl.root_id();
drop(zpl);
let streams = list_streams("test", root_id).unwrap();
assert!(!streams.is_empty());
assert!(streams[0].is_primary);
}
}