use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::Arc;
use std::time::Duration;
use crate::parser::WasmaConfig;
use crate::uclient::SectionMemory;
pub mod posix {
use std::io;
use std::os::unix::io::RawFd;
pub fn posix_open(path: &str, flags: i32) -> io::Result<RawFd> {
use std::ffi::CString;
let c_path =
CString::new(path).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
let fd = unsafe { libc::open(c_path.as_ptr(), flags) };
if fd < 0 {
Err(io::Error::last_os_error())
} else {
Ok(fd)
}
}
pub fn posix_read(fd: RawFd, buf: &mut [u8]) -> io::Result<usize> {
let n = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut libc::c_void, buf.len()) };
match n {
-1 => Err(io::Error::last_os_error()),
0 => Ok(0), n => Ok(n as usize),
}
}
pub fn posix_read_exact(fd: RawFd, buf: &mut [u8]) -> io::Result<()> {
let mut total = 0;
while total < buf.len() {
match posix_read(fd, &mut buf[total..]) {
Ok(0) => {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"Beklenmedik EOF: POSIX read_exact",
))
}
Ok(n) => total += n,
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
}
}
Ok(())
}
pub fn posix_close(fd: RawFd) -> io::Result<()> {
let ret = unsafe { libc::close(fd) };
if ret == 0 {
Ok(())
} else {
Err(io::Error::last_os_error())
}
}
pub fn posix_set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> {
let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
if flags < 0 {
return Err(io::Error::last_os_error());
}
let new_flags = if nonblocking {
flags | libc::O_NONBLOCK
} else {
flags & !libc::O_NONBLOCK
};
let ret = unsafe { libc::fcntl(fd, libc::F_SETFL, new_flags) };
if ret < 0 {
Err(io::Error::last_os_error())
} else {
Ok(())
}
}
pub fn posix_poll_readable(fd: RawFd, timeout_ms: i32) -> io::Result<bool> {
let mut pfd = libc::pollfd {
fd,
events: libc::POLLIN,
revents: 0,
};
let ret = unsafe { libc::poll(&mut pfd, 1, timeout_ms) };
match ret {
-1 => Err(io::Error::last_os_error()),
0 => Ok(false), _ => Ok(pfd.revents & libc::POLLIN != 0),
}
}
}
pub trait UClientEngine {
fn start_engine(&mut self) -> Result<(), Box<dyn std::error::Error>>;
fn dispatch_data(&self, data: &[u8]);
fn memory_usage(&self) -> (usize, usize, usize);
fn get_config(&self) -> &WasmaConfig;
fn shutdown(&mut self) -> Result<(), Box<dyn std::error::Error>> {
println!("🛑 UClientEngine: Motor stopping...");
Ok(())
}
fn is_active(&self) -> bool {
true
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum RawAppSource {
UnixSocket(String),
CharDevice(String),
NamedPipe(String),
TcpSocket { ip: String, port: u16 },
Stdin,
}
impl RawAppSource {
pub fn display_name(&self) -> String {
match self {
Self::UnixSocket(p) => format!("unix:{}", p),
Self::CharDevice(p) => format!("chardev:{}", p),
Self::NamedPipe(p) => format!("fifo:{}", p),
Self::TcpSocket { ip, port } => format!("tcp:{}:{}", ip, port),
Self::Stdin => "stdin".to_string(),
}
}
}
pub struct RawAppDescriptor {
pub app_id: String,
pub source: RawAppSource,
fd: Option<RawFd>,
nonblocking: bool,
}
impl RawAppDescriptor {
pub fn new(app_id: impl Into<String>, source: RawAppSource) -> Self {
Self {
app_id: app_id.into(),
source,
fd: None,
nonblocking: false,
}
}
pub fn open(&mut self) -> Result<RawFd, std::io::Error> {
if let Some(fd) = self.fd {
return Ok(fd); }
let fd = match &self.source {
RawAppSource::UnixSocket(path) => self.connect_unix_socket(path)?,
RawAppSource::CharDevice(path) => posix::posix_open(path, libc::O_RDONLY)?,
RawAppSource::NamedPipe(path) => {
posix::posix_open(path, libc::O_RDONLY | libc::O_NONBLOCK)?
}
RawAppSource::TcpSocket { ip, port } => self.connect_tcp(ip, *port)?,
RawAppSource::Stdin => {
0 }
};
self.fd = Some(fd);
println!(
"📂 RawAppDescriptor: '{}' opened (fd={})",
self.source.display_name(),
fd
);
Ok(fd)
}
fn connect_unix_socket(&self, path: &str) -> Result<RawFd, std::io::Error> {
use std::os::unix::net::UnixStream;
let stream = UnixStream::connect(path)?;
let fd = stream.as_raw_fd();
let _ = std::mem::ManuallyDrop::new(stream);
Ok(fd)
}
fn connect_tcp(&self, ip: &str, port: u16) -> Result<RawFd, std::io::Error> {
use std::net::TcpStream;
let addr = format!("{}:{}", ip, port);
let stream = TcpStream::connect(&addr)?;
let fd = stream.as_raw_fd();
let _ = std::mem::ManuallyDrop::new(stream);
Ok(fd)
}
pub fn set_nonblocking(&mut self, nb: bool) -> Result<(), std::io::Error> {
if let Some(fd) = self.fd {
posix::posix_set_nonblocking(fd, nb)?;
}
self.nonblocking = nb;
Ok(())
}
pub fn fd(&self) -> Option<RawFd> {
self.fd
}
pub fn poll_readable(&self, timeout_ms: i32) -> Result<bool, std::io::Error> {
match self.fd {
Some(fd) => posix::posix_poll_readable(fd, timeout_ms),
None => Err(std::io::Error::new(
std::io::ErrorKind::NotConnected,
"Descriptor henüz açılmadı",
)),
}
}
}
impl Drop for RawAppDescriptor {
fn drop(&mut self) {
if let Some(fd) = self.fd.take() {
if fd != 0 {
let _ = posix::posix_close(fd);
println!(
"📁 RawAppDescriptor: '{}' closed (fd={})",
self.source.display_name(),
fd
);
}
}
}
}
pub struct RawAppBuffer {
buf_a: Vec<u8>,
buf_b: Vec<u8>,
active: bool, buf_size: usize,
}
impl RawAppBuffer {
pub fn new(buf_size: usize) -> Self {
Self {
buf_a: vec![0u8; buf_size],
buf_b: vec![0u8; buf_size],
active: true,
buf_size,
}
}
pub fn write_buf(&mut self) -> &mut [u8] {
if self.active {
&mut self.buf_a
} else {
&mut self.buf_b
}
}
pub fn read_buf(&self) -> &[u8] {
if self.active {
&self.buf_b
} else {
&self.buf_a
}
}
pub fn swap(&mut self) {
self.active = !self.active;
}
pub fn buf_size(&self) -> usize {
self.buf_size
}
}
pub struct RawAppClient {
config: Arc<WasmaConfig>,
descriptor: RawAppDescriptor,
memory: SectionMemory,
buffer: RawAppBuffer,
active: bool,
}
impl RawAppClient {
pub fn new(config: WasmaConfig) -> Self {
let level = config.resource_limits.scope_level;
let source = RawAppSource::UnixSocket("/run/wasma/app.sock".to_string());
Self {
descriptor: RawAppDescriptor::new("wasma.raw.app", source),
memory: SectionMemory::new(level),
buffer: RawAppBuffer::new(Self::buf_size_for_level(level)),
config: Arc::new(config),
active: false,
}
}
pub fn from_config(config: Arc<WasmaConfig>) -> Self {
let level = config.resource_limits.scope_level;
let source = RawAppSource::UnixSocket("/run/wasma/app.sock".to_string());
Self {
descriptor: RawAppDescriptor::new("wasma.raw.app", source),
memory: SectionMemory::new(level),
buffer: RawAppBuffer::new(Self::buf_size_for_level(level)),
config,
active: false,
}
}
pub fn with_source(mut self, source: RawAppSource) -> Self {
self.descriptor = RawAppDescriptor::new(self.descriptor.app_id.clone(), source);
self
}
fn buf_size_for_level(level: u32) -> usize {
match level {
0 => 4096, 1..=10 => 8 * 1024, 11..=50 => 64 * 1024, _ => 256 * 1024, }
}
fn process_raw_stream(&self, data: &[u8]) {
self.dispatch_data(data);
}
fn process_partitioned(&mut self, fd: RawFd) -> Result<(), Box<dyn std::error::Error>> {
for i in 0..self.memory.cell_count {
let cell = self.memory.get_cell_mut(i);
posix::posix_read_exact(fd, cell)?;
let cell_data = self.memory.get_cell(i).to_vec();
self.dispatch_data(&cell_data);
}
Ok(())
}
}
impl UClientEngine for RawAppClient {
fn start_engine(&mut self) -> Result<(), Box<dyn std::error::Error>> {
let level = self.config.resource_limits.scope_level;
println!(
"🔌 RawAppClient: Source opened → {}",
self.descriptor.source.display_name()
);
let fd = self.descriptor.open()?;
self.active = true;
println!("🟢 WASMA RawAppClient: Motor Başladı");
println!(
"📡 Mod: {}",
if level == 0 {
"NULL_EXCEPTION (Bypass/Raw)"
} else {
"Bölümlenmiş (Partitioned)"
}
);
println!("🎨 Renderer: {}", self.config.resource_limits.renderer);
println!("📦 Tampon boyutu: {} byte", self.buffer.buf_size());
if level == 0 {
loop {
match self.descriptor.poll_readable(100) {
Ok(false) => continue, Ok(true) => {}
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => return Err(Box::new(e)),
}
let write_buf = self.buffer.write_buf();
match posix::posix_read(fd, write_buf) {
Ok(0) => {
println!("📭 EOF getting, motor stopping.");
break;
}
Ok(n) => {
self.buffer.swap();
let data = self.buffer.read_buf()[..n].to_vec();
self.process_raw_stream(&data);
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
std::thread::sleep(Duration::from_millis(1));
continue;
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => return Err(Box::new(e)),
}
}
} else {
loop {
match self.process_partitioned(fd) {
Ok(_) => {}
Err(e) if e.to_string().contains("Beklenmedik EOF") => {
println!("📭 Data flowing completed..");
break;
}
Err(e) => return Err(e),
}
}
}
self.active = false;
Ok(())
}
fn dispatch_data(&self, data: &[u8]) {
match self.config.resource_limits.renderer.as_str() {
"glx_renderer" => {
#[cfg(feature = "glx")]
{
println!("🎨 GLX dispatch: {} byte", data.len());
}
#[cfg(not(feature = "glx"))]
{
self.dispatch_cpu_fallback(data);
}
}
"renderer_opencl" | "opencl" => {
#[cfg(feature = "opencl-gpu")]
{
println!("🎮 OpenCL dispatch: {} byte", data.len());
}
#[cfg(not(feature = "opencl-gpu"))]
{
self.dispatch_cpu_fallback(data);
}
}
"renderer_iuhd" | "intel_uhd" => {
#[cfg(feature = "intel-uhd")]
{
println!("💻 Intel UHD dispatch: {} byte", data.len());
}
#[cfg(not(feature = "intel-uhd"))]
{
self.dispatch_cpu_fallback(data);
}
}
"cpu_renderer" | "cpu" | _ => {
self.dispatch_cpu_fallback(data);
}
}
}
fn memory_usage(&self) -> (usize, usize, usize) {
(
self.memory.raw_storage.len(),
self.memory.cell_count,
self.memory.cell_size,
)
}
fn get_config(&self) -> &WasmaConfig {
&self.config
}
fn shutdown(&mut self) -> Result<(), Box<dyn std::error::Error>> {
self.active = false;
println!(
"🛑 RawAppClient closed: {}",
self.descriptor.source.display_name()
);
Ok(())
}
fn is_active(&self) -> bool {
self.active
}
}
impl RawAppClient {
fn dispatch_cpu_fallback(&self, data: &[u8]) {
for chunk in data.chunks(1024) {
let _sum: u32 = chunk.iter().map(|&x| x as u32).sum();
}
}
pub fn descriptor(&self) -> &RawAppDescriptor {
&self.descriptor
}
pub fn descriptor_mut(&mut self) -> &mut RawAppDescriptor {
&mut self.descriptor
}
}
pub struct RawAppClientBuilder {
config: Option<WasmaConfig>,
source: Option<RawAppSource>,
app_id: Option<String>,
}
impl RawAppClientBuilder {
pub fn new() -> Self {
Self {
config: None,
source: None,
app_id: None,
}
}
pub fn with_config(mut self, config: WasmaConfig) -> Self {
self.config = Some(config);
self
}
pub fn with_source(mut self, source: RawAppSource) -> Self {
self.source = Some(source);
self
}
pub fn with_app_id(mut self, id: impl Into<String>) -> Self {
self.app_id = Some(id.into());
self
}
pub fn build(self) -> Result<RawAppClient, String> {
let config = self.config.ok_or("Config required")?;
let mut client = RawAppClient::new(config);
if let Some(source) = self.source {
let app_id = self.app_id.unwrap_or_else(|| "wasma.raw.app".to_string());
client.descriptor = RawAppDescriptor::new(app_id, source);
}
Ok(client)
}
}
impl Default for RawAppClientBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::parser::ConfigParser;
fn make_config() -> WasmaConfig {
let parser = ConfigParser::new(None);
let content = parser.generate_default_config();
parser.parse(&content).unwrap()
}
#[test]
fn test_raw_app_client_creation() {
let config = make_config();
let client = RawAppClient::new(config);
let (total, cells, cell_size) = client.memory_usage();
assert!(total > 0);
assert!(cells > 0);
assert_eq!(cell_size, 1024 * 1024);
assert!(!client.is_active());
println!(
"✅ RawAppClient building...: cells={}, cell_size={}KB",
cells,
cell_size / 1024
);
}
#[test]
fn test_builder_pattern() {
let config = make_config();
let client = RawAppClientBuilder::new()
.with_config(config)
.with_source(RawAppSource::Stdin)
.with_app_id("test.app")
.build()
.unwrap();
assert_eq!(client.descriptor().app_id, "test.app");
assert_eq!(client.descriptor().source, RawAppSource::Stdin);
println!("✅ Builder pattern running");
}
#[test]
fn test_section_memory_compatibility() {
let mem = SectionMemory::new(10);
assert_eq!(mem.cell_count, 10);
assert_eq!(mem.cell_size, 1024 * 1024);
println!("✅ SectionMemory compability verifying");
}
#[test]
fn test_raw_app_buffer_pingpong() {
let mut buf = RawAppBuffer::new(4096);
buf.write_buf()[0] = 0xAB;
buf.swap();
assert_eq!(buf.read_buf()[0], 0xAB);
buf.write_buf()[0] = 0xCD;
buf.swap();
assert_eq!(buf.read_buf()[0], 0xCD);
println!("✅ Ping-pong buffer running");
}
#[test]
fn test_dispatch_cpu_fallback() {
let config = make_config();
let client = RawAppClient::new(config);
let data = vec![1u8; 2048];
client.dispatch_data(&data);
println!("✅ CPU fallback dispatch running");
}
#[test]
fn test_buf_size_for_level() {
assert_eq!(RawAppClient::buf_size_for_level(0), 4096);
assert_eq!(RawAppClient::buf_size_for_level(5), 8 * 1024);
assert_eq!(RawAppClient::buf_size_for_level(25), 64 * 1024);
assert_eq!(RawAppClient::buf_size_for_level(100), 256 * 1024);
println!("✅ Buffer sizing verifying");
}
#[test]
fn test_source_display_names() {
assert_eq!(
RawAppSource::UnixSocket("/run/wasma/app.sock".into()).display_name(),
"unix:/run/wasma/app.sock"
);
assert_eq!(RawAppSource::Stdin.display_name(), "stdin");
assert_eq!(
RawAppSource::TcpSocket {
ip: "127.0.0.1".into(),
port: 8080
}
.display_name(),
"tcp:127.0.0.1:8080"
);
println!("✅ RawAppSource display_name running...");
}
#[test]
fn test_uclient_engine_trait_object() {
let config = make_config();
let client: Box<dyn UClientEngine> = Box::new(RawAppClient::new(config));
assert!(!client.is_active());
println!("✅ UClientEngine trait object running...");
}
}