avila_async/
lib.rs

1//! Avila Async - Native async runtime
2//! Tokio alternative - 100% Rust std
3//!
4//! # Industry 4.0 Features
5//! - Work-stealing scheduler with auto-scaling
6//! - Multi-threaded executor with resource management
7//! - Real-time metrics and monitoring
8//! - Distributed tracing support
9//! - Health checks (readiness & liveness)
10//! - Async I/O primitives
11//! - Channel support
12//! - Task cancellation
13//! - Graceful shutdown
14//! - Prometheus metrics export
15//! - Zero external dependencies
16
17// Core modules
18pub mod metrics;
19pub mod tracing;
20pub mod health;
21pub mod autoscale;
22
23pub use metrics::{Metrics, MetricsSnapshot};
24pub use tracing::{TraceContext, Tracer, Span, CompletedSpan};
25pub use health::{HealthCheck, HealthStatus, HealthReport};
26pub use autoscale::{AutoScaler, ScalingConfig, ScalingDecision, ResourceLimits};
27
28use std::future::Future;
29use std::pin::Pin;
30use std::task::{Context, Poll, Wake};
31use std::sync::{Arc, Mutex, Condvar, atomic::{AtomicBool, AtomicUsize, Ordering}};
32use std::collections::VecDeque;
33use std::thread;
34use std::time::{Duration, Instant};
35
36type Task = Pin<Box<dyn Future<Output = ()> + Send>>;
37
38/// Runtime configuration for Industry 4.0 features
39#[derive(Clone, Debug)]
40pub struct RuntimeConfig {
41    pub num_threads: Option<usize>,
42    pub enable_autoscaling: bool,
43    pub scaling_config: ScalingConfig,
44    pub resource_limits: ResourceLimits,
45}
46
47impl Default for RuntimeConfig {
48    fn default() -> Self {
49        Self {
50            num_threads: None,
51            enable_autoscaling: false,
52            scaling_config: ScalingConfig::default(),
53            resource_limits: ResourceLimits::default(),
54        }
55    }
56}
57
58/// Task handle for spawned futures
59pub struct JoinHandle<T> {
60    result: Arc<Mutex<Option<T>>>,
61    completed: Arc<AtomicBool>,
62}
63
64impl<T> JoinHandle<T> {
65    /// Wait for the task to complete and return its result
66    pub async fn await_result(self) -> Option<T> {
67        while !self.completed.load(Ordering::Acquire) {
68            yield_now().await;
69        }
70        self.result.lock().unwrap().take()
71    }
72}
73
74pub struct Runtime {
75    queue: Arc<Mutex<VecDeque<Task>>>,
76    shutdown: Arc<AtomicBool>,
77    task_count: Arc<AtomicUsize>,
78    condvar: Arc<Condvar>,
79    metrics: Metrics,
80    health: HealthCheck,
81    tracer: Tracer,
82    autoscaler: Option<AutoScaler>,
83    resource_limits: ResourceLimits,
84}
85
86impl Runtime {
87    /// Create a new runtime instance
88    pub fn new() -> Self {
89        Self::with_config(RuntimeConfig::default())
90    }
91
92    /// Create runtime with custom configuration
93    pub fn with_config(config: RuntimeConfig) -> Self {
94        let metrics = Metrics::new();
95        let health = HealthCheck::new();
96        let tracer = Tracer::new();
97
98        let autoscaler = if config.enable_autoscaling {
99            Some(AutoScaler::new(config.scaling_config))
100        } else {
101            None
102        };
103
104        let num_threads = config.num_threads.unwrap_or_else(|| {
105            std::thread::available_parallelism()
106                .map(|n| n.get())
107                .unwrap_or(4)
108        });
109
110        metrics.set_thread_count(num_threads);
111
112        Self {
113            queue: Arc::new(Mutex::new(VecDeque::new())),
114            shutdown: Arc::new(AtomicBool::new(false)),
115            task_count: Arc::new(AtomicUsize::new(0)),
116            condvar: Arc::new(Condvar::new()),
117            metrics,
118            health,
119            tracer,
120            autoscaler,
121            resource_limits: config.resource_limits,
122        }
123    }
124
125    /// Get metrics collector
126    pub fn metrics(&self) -> &Metrics {
127        &self.metrics
128    }
129
130    /// Get health checker
131    pub fn health(&self) -> &HealthCheck {
132        &self.health
133    }
134
135    /// Get tracer
136    pub fn tracer(&self) -> &Tracer {
137        &self.tracer
138    }
139    /// Get the number of active tasks
140    pub fn task_count(&self) -> usize {
141        self.task_count.load(Ordering::Relaxed)
142    }
143
144    /// Initiate graceful shutdown
145    pub fn shutdown(&self) {
146        self.shutdown.store(true, Ordering::Release);
147        self.health.set_ready(false);
148        self.condvar.notify_all();
149    }
150
151    /// Spawn a future onto the runtime
152    pub fn spawn<F>(&self, future: F)
153    where
154        F: Future<Output = ()> + Send + 'static,
155    {
156        // Check resource limits
157        let queue_len = {
158            let queue = self.queue.lock().unwrap();
159            queue.len()
160        };
161
162        if self.resource_limits.is_queue_size_exceeded(queue_len) {
163            self.health.add_check(
164                "queue_limit",
165                HealthStatus::Degraded,
166                format!("Queue size {} exceeds limit", queue_len),
167            );
168            return;
169        }
170
171        self.metrics.task_spawned();
172        self.task_count.fetch_add(1, Ordering::Relaxed);
173        let task_count = Arc::clone(&self.task_count);
174        let condvar = Arc::clone(&self.condvar);
175        let metrics = self.metrics.clone();
176        let start_time = Instant::now();
177
178        let wrapped = async move {
179            future.await;
180            let execution_time = start_time.elapsed();
181            metrics.task_completed(execution_time);
182            task_count.fetch_sub(1, Ordering::Relaxed);
183            condvar.notify_all();
184        };
185
186        let mut queue = self.queue.lock().unwrap();
187        queue.push_back(Box::pin(wrapped));
188        self.metrics.queue_length_changed(queue.len());
189        self.condvar.notify_one();
190    }
191
192    /// Spawn a future and return a handle to await its result
193    pub fn spawn_with_handle<F, T>(&self, future: F) -> JoinHandle<T>
194    where
195        F: Future<Output = T> + Send + 'static,
196        T: Send + 'static,
197    {
198        let result = Arc::new(Mutex::new(None));
199        let completed = Arc::new(AtomicBool::new(false));
200        let result_clone = Arc::clone(&result);
201        let completed_clone = Arc::clone(&completed);
202
203        let task = async move {
204            let output = future.await;
205            *result_clone.lock().unwrap() = Some(output);
206            completed_clone.store(true, Ordering::Release);
207        };
208
209        self.spawn(task);
210        JoinHandle { result, completed }
211    }
212
213    pub fn block_on<F, T>(&self, future: F) -> T
214    where
215        F: Future<Output = T> + Send + 'static,
216        T: Send + 'static,
217    {
218        let result = Arc::new(Mutex::new(None));
219        let result_clone = Arc::clone(&result);
220
221        let task = async move {
222            let output = future.await;
223            *result_clone.lock().unwrap() = Some(output);
224        };
225
226        self.spawn(Box::pin(task));
227        self.run();
228
229        Arc::try_unwrap(result)
230            .ok()
231            .and_then(|m| m.into_inner().ok())
232            .and_then(|opt| opt)
233            .expect("Task did not complete")
234    }
235
236    fn run(&self) {
237        let num_threads = std::thread::available_parallelism()
238            .map(|n| n.get())
239            .unwrap_or(4);
240
241        self.health.set_alive(true);
242        self.health.set_ready(true);
243
244        let mut handles = vec![];
245
246        for thread_id in 0..num_threads {
247            let queue = Arc::clone(&self.queue);
248            let shutdown = Arc::clone(&self.shutdown);
249            let task_count = Arc::clone(&self.task_count);
250            let condvar = Arc::clone(&self.condvar);
251            let metrics = self.metrics.clone();
252            let health = self.health.clone();
253
254            let handle = thread::spawn(move || {
255                let waker = Arc::new(RuntimeWaker { condvar: Arc::clone(&condvar) }).into();
256
257                loop {
258                    health.heartbeat();
259
260                    if shutdown.load(Ordering::Acquire) && task_count.load(Ordering::Relaxed) == 0 {
261                        break;
262                    }
263
264                    let task = {
265                        let mut q = queue.lock().unwrap();
266                        if q.is_empty() && !shutdown.load(Ordering::Acquire) {
267                            metrics.thread_idle();
268                            q = condvar.wait_timeout(q, Duration::from_millis(100)).unwrap().0;
269                            metrics.thread_active();
270                        }
271                        let task = q.pop_front();
272                        metrics.queue_length_changed(q.len());
273                        task
274                    };
275
276                    match task {
277                        Some(mut task) => {
278                            metrics.thread_active();
279                            let mut context = Context::from_waker(&waker);
280                            match task.as_mut().poll(&mut context) {
281                                Poll::Ready(()) => {},
282                                Poll::Pending => {
283                                    let mut q = queue.lock().unwrap();
284                                    q.push_back(task);
285                                    metrics.queue_length_changed(q.len());
286                                }
287                            }
288                        }
289                        None if shutdown.load(Ordering::Acquire) => break,
290                        None => {}
291                    }
292                }
293            });
294            handles.push(handle);
295        }
296
297        for handle in handles {
298            let _ = handle.join();
299        }
300
301        self.health.set_alive(false);
302    }
303}
304
305impl Default for Runtime {
306    fn default() -> Self {
307        Self::new()
308    }
309}
310
311struct RuntimeWaker {
312    condvar: Arc<Condvar>,
313}
314
315impl Wake for RuntimeWaker {
316    fn wake(self: Arc<Self>) {
317        self.condvar.notify_one();
318    }
319
320    fn wake_by_ref(self: &Arc<Self>) {
321        self.condvar.notify_one();
322    }
323}
324
325// Global helper function
326pub fn spawn<F>(future: F)
327where
328    F: Future<Output = ()> + Send + 'static,
329{
330    RUNTIME.with(|rt| {
331        rt.borrow().spawn(future);
332    });
333}
334
335thread_local! {
336    static RUNTIME: std::cell::RefCell<Runtime> = std::cell::RefCell::new(Runtime::new());
337}
338
339// Macro for async main
340#[macro_export]
341macro_rules! main {
342    ($($body:tt)*) => {
343        fn main() {
344            let rt = $crate::Runtime::new();
345            rt.block_on(async { $($body)* });
346        }
347    };
348}
349
350/// Yield execution to allow other tasks to run
351pub async fn yield_now() {
352    struct YieldNow {
353        yielded: bool,
354    }
355
356    impl Future for YieldNow {
357        type Output = ();
358
359        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
360            if self.yielded {
361                Poll::Ready(())
362            } else {
363                self.yielded = true;
364                cx.waker().wake_by_ref();
365                Poll::Pending
366            }
367        }
368    }
369
370    YieldNow { yielded: false }.await
371}
372
373/// Sleep for a specified duration
374pub async fn sleep(duration: Duration) {
375    struct Sleep {
376        when: std::time::Instant,
377    }
378
379    impl Future for Sleep {
380        type Output = ();
381
382        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
383            if std::time::Instant::now() >= self.when {
384                Poll::Ready(())
385            } else {
386                cx.waker().wake_by_ref();
387                Poll::Pending
388            }
389        }
390    }
391
392    Sleep {
393        when: std::time::Instant::now() + duration,
394    }
395    .await
396}
397
398/// Execute a future with a timeout
399pub async fn timeout<F, T>(duration: Duration, future: F) -> Result<T, TimeoutError>
400where
401    F: Future<Output = T>,
402{
403    struct Timeout<F> {
404        future: Pin<Box<F>>,
405        deadline: Instant,
406    }
407
408    impl<F: Future> Future for Timeout<F> {
409        type Output = Result<F::Output, TimeoutError>;
410
411        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
412            if Instant::now() >= self.deadline {
413                return Poll::Ready(Err(TimeoutError));
414            }
415
416            match self.future.as_mut().poll(cx) {
417                Poll::Ready(v) => Poll::Ready(Ok(v)),
418                Poll::Pending => {
419                    cx.waker().wake_by_ref();
420                    Poll::Pending
421                }
422            }
423        }
424    }
425
426    Timeout {
427        future: Box::pin(future),
428        deadline: Instant::now() + duration,
429    }
430    .await
431}
432
433/// Timeout error type
434#[derive(Debug, Clone, Copy, PartialEq, Eq)]
435pub struct TimeoutError;
436
437impl std::fmt::Display for TimeoutError {
438    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
439        write!(f, "operation timed out")
440    }
441}
442
443impl std::error::Error for TimeoutError {}
444
445/// Async channel for message passing
446pub mod channel {
447    use std::sync::{Arc, Mutex, Condvar};
448    use std::collections::VecDeque;
449
450    /// Create a bounded channel with specified capacity
451    pub fn bounded<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
452        let inner = Arc::new(ChannelInner {
453            queue: Mutex::new(VecDeque::with_capacity(capacity)),
454            condvar: Condvar::new(),
455            capacity,
456            closed: Mutex::new(false),
457        });
458        (Sender { inner: inner.clone() }, Receiver { inner })
459    }
460
461    /// Create an unbounded channel
462    pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
463        bounded(usize::MAX)
464    }
465
466    struct ChannelInner<T> {
467        queue: Mutex<VecDeque<T>>,
468        condvar: Condvar,
469        capacity: usize,
470        closed: Mutex<bool>,
471    }
472
473    /// Sender half of a channel
474    pub struct Sender<T> {
475        inner: Arc<ChannelInner<T>>,
476    }
477
478    impl<T> Sender<T> {
479        /// Send a value through the channel
480        pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
481            if *self.inner.closed.lock().unwrap() {
482                return Err(SendError(value));
483            }
484
485            loop {
486                let mut queue = self.inner.queue.lock().unwrap();
487                if queue.len() < self.inner.capacity {
488                    queue.push_back(value);
489                    self.inner.condvar.notify_one();
490                    return Ok(());
491                }
492                drop(queue);
493                let queue = self.inner.queue.lock().unwrap();
494                let _guard = self.inner.condvar.wait(queue).unwrap();
495            }
496        }
497    }
498
499    impl<T> Clone for Sender<T> {
500        fn clone(&self) -> Self {
501            Self { inner: self.inner.clone() }
502        }
503    }
504
505    impl<T> Drop for Sender<T> {
506        fn drop(&mut self) {
507            if Arc::strong_count(&self.inner) == 2 {
508                *self.inner.closed.lock().unwrap() = true;
509                self.inner.condvar.notify_all();
510            }
511        }
512    }
513
514    /// Receiver half of a channel
515    pub struct Receiver<T> {
516        inner: Arc<ChannelInner<T>>,
517    }
518
519    impl<T> Receiver<T> {
520        /// Receive a value from the channel
521        pub async fn recv(&self) -> Option<T> {
522            loop {
523                let mut queue = self.inner.queue.lock().unwrap();
524                if let Some(value) = queue.pop_front() {
525                    self.inner.condvar.notify_one();
526                    return Some(value);
527                }
528                if *self.inner.closed.lock().unwrap() && queue.is_empty() {
529                    return None;
530                }
531                drop(queue);
532                let queue = self.inner.queue.lock().unwrap();
533                let _guard = self.inner.condvar.wait(queue).unwrap();
534            }
535        }
536    }
537
538    /// Error returned when sending fails
539    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
540    pub struct SendError<T>(pub T);
541
542    impl<T> std::fmt::Display for SendError<T> {
543        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
544            write!(f, "channel closed")
545        }
546    }
547
548    impl<T: std::fmt::Debug> std::error::Error for SendError<T> {}
549}
550
551// Basic network modules
552pub mod net {
553    use std::io;
554    use std::net::{TcpListener as StdListener, TcpStream as StdStream, SocketAddr};
555
556    pub struct TcpListener(StdListener);
557    pub struct TcpStream(StdStream);
558
559    impl TcpListener {
560        pub async fn bind(addr: SocketAddr) -> io::Result<Self> {
561            let listener = StdListener::bind(addr)?;
562            listener.set_nonblocking(true)?;
563            Ok(Self(listener))
564        }
565
566        pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
567            loop {
568                match self.0.accept() {
569                    Ok((stream, addr)) => {
570                        stream.set_nonblocking(true)?;
571                        return Ok((TcpStream(stream), addr));
572                    }
573                    Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
574                        crate::sleep(std::time::Duration::from_millis(10)).await;
575                    }
576                    Err(e) => return Err(e),
577                }
578            }
579        }
580    }
581
582    impl TcpStream {
583        pub async fn connect(addr: SocketAddr) -> io::Result<Self> {
584            let stream = StdStream::connect(addr)?;
585            stream.set_nonblocking(true)?;
586            Ok(Self(stream))
587        }
588
589        pub fn into_std(self) -> StdStream {
590            self.0
591        }
592
593        pub fn as_std(&self) -> &StdStream {
594            &self.0
595        }
596
597        /// Read data from the stream
598        pub async fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
599            use std::io::Read;
600            loop {
601                match self.0.read(buf) {
602                    Ok(n) => return Ok(n),
603                    Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
604                        crate::sleep(std::time::Duration::from_millis(1)).await;
605                    }
606                    Err(e) => return Err(e),
607                }
608            }
609        }
610
611        /// Write data to the stream
612        pub async fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
613            use std::io::Write;
614            loop {
615                match self.0.write(buf) {
616                    Ok(n) => return Ok(n),
617                    Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
618                        crate::sleep(std::time::Duration::from_millis(1)).await;
619                    }
620                    Err(e) => return Err(e),
621                }
622            }
623        }
624
625        /// Write all data to the stream
626        pub async fn write_all(&mut self, mut buf: &[u8]) -> io::Result<()> {
627            while !buf.is_empty() {
628                let n = self.write(buf).await?;
629                buf = &buf[n..];
630            }
631            Ok(())
632        }
633    }
634}
635
636// Basic I/O module
637pub mod io {
638    use std::io::{self, Read, Write};
639
640    pub async fn copy<R: Read, W: Write>(reader: &mut R, writer: &mut W) -> io::Result<u64> {
641        let mut buf = [0u8; 8192];
642        let mut total = 0u64;
643
644        loop {
645            match reader.read(&mut buf) {
646                Ok(0) => return Ok(total),
647                Ok(n) => {
648                    writer.write_all(&buf[..n])?;
649                    total += n as u64;
650                }
651                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
652                    crate::sleep(std::time::Duration::from_millis(1)).await;
653                }
654                Err(e) => return Err(e),
655            }
656        }
657    }
658}