use fusabi_host::{ExecutionContext, Result, Value, Error};
use std::sync::Arc;
use parking_lot::Mutex;
use std::collections::HashMap;
lazy_static::lazy_static! {
static ref STREAMS: Arc<Mutex<HashMap<i64, FileStream>>> = Arc::new(Mutex::new(HashMap::new()));
}
static NEXT_HANDLE: std::sync::atomic::AtomicI64 = std::sync::atomic::AtomicI64::new(1);
#[derive(Clone)]
struct FileStream {
path: String,
buffer_size: usize,
position: usize,
}
pub fn tail(args: &[Value], _ctx: &ExecutionContext) -> Result<Value> {
let path = args
.first()
.and_then(|v| v.as_str())
.ok_or_else(|| Error::host_function("fs_stream.tail: missing path argument"))?;
let buffer_size = args
.get(1)
.and_then(|v| v.as_int())
.unwrap_or(100) as usize;
let handle = NEXT_HANDLE.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let stream = FileStream {
path: path.to_string(),
buffer_size,
position: 0,
};
STREAMS.lock().insert(handle, stream);
tracing::debug!("fs_stream.tail: opened {} with buffer_size={}, handle={}", path, buffer_size, handle);
Ok(Value::Int(handle))
}
pub fn read_line(args: &[Value], _ctx: &ExecutionContext) -> Result<Value> {
let handle = args
.first()
.and_then(|v| v.as_int())
.ok_or_else(|| Error::host_function("fs_stream.read_line: missing handle argument"))?;
let mut streams = STREAMS.lock();
let stream = streams
.get_mut(&handle)
.ok_or_else(|| Error::host_function("fs_stream.read_line: invalid handle"))?;
stream.position += 1;
if stream.position % 3 == 0 {
Ok(Value::String(format!("Mock line {} from {}", stream.position, stream.path)))
} else {
Ok(Value::Null)
}
}
pub fn close(args: &[Value], _ctx: &ExecutionContext) -> Result<Value> {
let handle = args
.first()
.and_then(|v| v.as_int())
.ok_or_else(|| Error::host_function("fs_stream.close: missing handle argument"))?;
let mut streams = STREAMS.lock();
if streams.remove(&handle).is_some() {
tracing::debug!("fs_stream.close: closed handle {}", handle);
Ok(Value::Null)
} else {
Err(Error::host_function("fs_stream.close: invalid handle"))
}
}
pub fn read_available(args: &[Value], _ctx: &ExecutionContext) -> Result<Value> {
let handle = args
.first()
.and_then(|v| v.as_int())
.ok_or_else(|| Error::host_function("fs_stream.read_available: missing handle argument"))?;
let streams = STREAMS.lock();
let _stream = streams
.get(&handle)
.ok_or_else(|| Error::host_function("fs_stream.read_available: invalid handle"))?;
Ok(Value::List(vec![]))
}
pub fn open(args: &[Value], _ctx: &ExecutionContext) -> Result<Value> {
let path = args
.first()
.and_then(|v| v.as_str())
.ok_or_else(|| Error::host_function("fs_stream.open: missing path argument"))?;
let chunk_size = args
.get(1)
.and_then(|v| v.as_int())
.unwrap_or(4096) as usize;
let handle = NEXT_HANDLE.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let stream = FileStream {
path: path.to_string(),
buffer_size: chunk_size,
position: 0,
};
STREAMS.lock().insert(handle, stream);
tracing::debug!("fs_stream.open: opened {} with chunk_size={}, handle={}", path, chunk_size, handle);
Ok(Value::Int(handle))
}
pub fn read_chunk(args: &[Value], _ctx: &ExecutionContext) -> Result<Value> {
let handle = args
.first()
.and_then(|v| v.as_int())
.ok_or_else(|| Error::host_function("fs_stream.read_chunk: missing handle argument"))?;
let mut streams = STREAMS.lock();
let stream = streams
.get_mut(&handle)
.ok_or_else(|| Error::host_function("fs_stream.read_chunk: invalid handle"))?;
stream.position += stream.buffer_size;
if stream.position > stream.buffer_size * 5 {
Ok(Value::Null)
} else {
Ok(Value::String(format!("Mock chunk at position {}", stream.position)))
}
}