1pub mod metrics;
19pub mod tracing;
20pub mod health;
21pub mod autoscale;
22
23pub use metrics::{Metrics, MetricsSnapshot};
24pub use tracing::{TraceContext, Tracer, Span, CompletedSpan};
25pub use health::{HealthCheck, HealthStatus, HealthReport};
26pub use autoscale::{AutoScaler, ScalingConfig, ScalingDecision, ResourceLimits};
27
28use std::future::Future;
29use std::pin::Pin;
30use std::task::{Context, Poll, Wake};
31use std::sync::{Arc, Mutex, Condvar, atomic::{AtomicBool, AtomicUsize, Ordering}};
32use std::collections::VecDeque;
33use std::thread;
34use std::time::{Duration, Instant};
35
36type Task = Pin<Box<dyn Future<Output = ()> + Send>>;
37
38#[derive(Clone, Debug)]
40pub struct RuntimeConfig {
41 pub num_threads: Option<usize>,
42 pub enable_autoscaling: bool,
43 pub scaling_config: ScalingConfig,
44 pub resource_limits: ResourceLimits,
45}
46
47impl Default for RuntimeConfig {
48 fn default() -> Self {
49 Self {
50 num_threads: None,
51 enable_autoscaling: false,
52 scaling_config: ScalingConfig::default(),
53 resource_limits: ResourceLimits::default(),
54 }
55 }
56}
57
58pub struct JoinHandle<T> {
60 result: Arc<Mutex<Option<T>>>,
61 completed: Arc<AtomicBool>,
62}
63
64impl<T> JoinHandle<T> {
65 pub async fn await_result(self) -> Option<T> {
67 while !self.completed.load(Ordering::Acquire) {
68 yield_now().await;
69 }
70 self.result.lock().unwrap().take()
71 }
72}
73
74pub struct Runtime {
75 queue: Arc<Mutex<VecDeque<Task>>>,
76 shutdown: Arc<AtomicBool>,
77 task_count: Arc<AtomicUsize>,
78 condvar: Arc<Condvar>,
79 metrics: Metrics,
80 health: HealthCheck,
81 tracer: Tracer,
82 autoscaler: Option<AutoScaler>,
83 resource_limits: ResourceLimits,
84}
85
86impl Runtime {
87 pub fn new() -> Self {
89 Self::with_config(RuntimeConfig::default())
90 }
91
92 pub fn with_config(config: RuntimeConfig) -> Self {
94 let metrics = Metrics::new();
95 let health = HealthCheck::new();
96 let tracer = Tracer::new();
97
98 let autoscaler = if config.enable_autoscaling {
99 Some(AutoScaler::new(config.scaling_config))
100 } else {
101 None
102 };
103
104 let num_threads = config.num_threads.unwrap_or_else(|| {
105 std::thread::available_parallelism()
106 .map(|n| n.get())
107 .unwrap_or(4)
108 });
109
110 metrics.set_thread_count(num_threads);
111
112 Self {
113 queue: Arc::new(Mutex::new(VecDeque::new())),
114 shutdown: Arc::new(AtomicBool::new(false)),
115 task_count: Arc::new(AtomicUsize::new(0)),
116 condvar: Arc::new(Condvar::new()),
117 metrics,
118 health,
119 tracer,
120 autoscaler,
121 resource_limits: config.resource_limits,
122 }
123 }
124
125 pub fn metrics(&self) -> &Metrics {
127 &self.metrics
128 }
129
130 pub fn health(&self) -> &HealthCheck {
132 &self.health
133 }
134
135 pub fn tracer(&self) -> &Tracer {
137 &self.tracer
138 }
139 pub fn task_count(&self) -> usize {
141 self.task_count.load(Ordering::Relaxed)
142 }
143
144 pub fn shutdown(&self) {
146 self.shutdown.store(true, Ordering::Release);
147 self.health.set_ready(false);
148 self.condvar.notify_all();
149 }
150
151 pub fn spawn<F>(&self, future: F)
153 where
154 F: Future<Output = ()> + Send + 'static,
155 {
156 let queue_len = {
158 let queue = self.queue.lock().unwrap();
159 queue.len()
160 };
161
162 if self.resource_limits.is_queue_size_exceeded(queue_len) {
163 self.health.add_check(
164 "queue_limit",
165 HealthStatus::Degraded,
166 format!("Queue size {} exceeds limit", queue_len),
167 );
168 return;
169 }
170
171 self.metrics.task_spawned();
172 self.task_count.fetch_add(1, Ordering::Relaxed);
173 let task_count = Arc::clone(&self.task_count);
174 let condvar = Arc::clone(&self.condvar);
175 let metrics = self.metrics.clone();
176 let start_time = Instant::now();
177
178 let wrapped = async move {
179 future.await;
180 let execution_time = start_time.elapsed();
181 metrics.task_completed(execution_time);
182 task_count.fetch_sub(1, Ordering::Relaxed);
183 condvar.notify_all();
184 };
185
186 let mut queue = self.queue.lock().unwrap();
187 queue.push_back(Box::pin(wrapped));
188 self.metrics.queue_length_changed(queue.len());
189 self.condvar.notify_one();
190 }
191
192 pub fn spawn_with_handle<F, T>(&self, future: F) -> JoinHandle<T>
194 where
195 F: Future<Output = T> + Send + 'static,
196 T: Send + 'static,
197 {
198 let result = Arc::new(Mutex::new(None));
199 let completed = Arc::new(AtomicBool::new(false));
200 let result_clone = Arc::clone(&result);
201 let completed_clone = Arc::clone(&completed);
202
203 let task = async move {
204 let output = future.await;
205 *result_clone.lock().unwrap() = Some(output);
206 completed_clone.store(true, Ordering::Release);
207 };
208
209 self.spawn(task);
210 JoinHandle { result, completed }
211 }
212
213 pub fn block_on<F, T>(&self, future: F) -> T
214 where
215 F: Future<Output = T> + Send + 'static,
216 T: Send + 'static,
217 {
218 let result = Arc::new(Mutex::new(None));
219 let result_clone = Arc::clone(&result);
220
221 let task = async move {
222 let output = future.await;
223 *result_clone.lock().unwrap() = Some(output);
224 };
225
226 self.spawn(Box::pin(task));
227 self.run();
228
229 Arc::try_unwrap(result)
230 .ok()
231 .and_then(|m| m.into_inner().ok())
232 .and_then(|opt| opt)
233 .expect("Task did not complete")
234 }
235
236 fn run(&self) {
237 let num_threads = std::thread::available_parallelism()
238 .map(|n| n.get())
239 .unwrap_or(4);
240
241 self.health.set_alive(true);
242 self.health.set_ready(true);
243
244 let mut handles = vec![];
245
246 for thread_id in 0..num_threads {
247 let queue = Arc::clone(&self.queue);
248 let shutdown = Arc::clone(&self.shutdown);
249 let task_count = Arc::clone(&self.task_count);
250 let condvar = Arc::clone(&self.condvar);
251 let metrics = self.metrics.clone();
252 let health = self.health.clone();
253
254 let handle = thread::spawn(move || {
255 let waker = Arc::new(RuntimeWaker { condvar: Arc::clone(&condvar) }).into();
256
257 loop {
258 health.heartbeat();
259
260 if shutdown.load(Ordering::Acquire) && task_count.load(Ordering::Relaxed) == 0 {
261 break;
262 }
263
264 let task = {
265 let mut q = queue.lock().unwrap();
266 if q.is_empty() && !shutdown.load(Ordering::Acquire) {
267 metrics.thread_idle();
268 q = condvar.wait_timeout(q, Duration::from_millis(100)).unwrap().0;
269 metrics.thread_active();
270 }
271 let task = q.pop_front();
272 metrics.queue_length_changed(q.len());
273 task
274 };
275
276 match task {
277 Some(mut task) => {
278 metrics.thread_active();
279 let mut context = Context::from_waker(&waker);
280 match task.as_mut().poll(&mut context) {
281 Poll::Ready(()) => {},
282 Poll::Pending => {
283 let mut q = queue.lock().unwrap();
284 q.push_back(task);
285 metrics.queue_length_changed(q.len());
286 }
287 }
288 }
289 None if shutdown.load(Ordering::Acquire) => break,
290 None => {}
291 }
292 }
293 });
294 handles.push(handle);
295 }
296
297 for handle in handles {
298 let _ = handle.join();
299 }
300
301 self.health.set_alive(false);
302 }
303}
304
305impl Default for Runtime {
306 fn default() -> Self {
307 Self::new()
308 }
309}
310
311struct RuntimeWaker {
312 condvar: Arc<Condvar>,
313}
314
315impl Wake for RuntimeWaker {
316 fn wake(self: Arc<Self>) {
317 self.condvar.notify_one();
318 }
319
320 fn wake_by_ref(self: &Arc<Self>) {
321 self.condvar.notify_one();
322 }
323}
324
325pub fn spawn<F>(future: F)
327where
328 F: Future<Output = ()> + Send + 'static,
329{
330 RUNTIME.with(|rt| {
331 rt.borrow().spawn(future);
332 });
333}
334
335thread_local! {
336 static RUNTIME: std::cell::RefCell<Runtime> = std::cell::RefCell::new(Runtime::new());
337}
338
339#[macro_export]
341macro_rules! main {
342 ($($body:tt)*) => {
343 fn main() {
344 let rt = $crate::Runtime::new();
345 rt.block_on(async { $($body)* });
346 }
347 };
348}
349
350pub async fn yield_now() {
352 struct YieldNow {
353 yielded: bool,
354 }
355
356 impl Future for YieldNow {
357 type Output = ();
358
359 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
360 if self.yielded {
361 Poll::Ready(())
362 } else {
363 self.yielded = true;
364 cx.waker().wake_by_ref();
365 Poll::Pending
366 }
367 }
368 }
369
370 YieldNow { yielded: false }.await
371}
372
373pub async fn sleep(duration: Duration) {
375 struct Sleep {
376 when: std::time::Instant,
377 }
378
379 impl Future for Sleep {
380 type Output = ();
381
382 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
383 if std::time::Instant::now() >= self.when {
384 Poll::Ready(())
385 } else {
386 cx.waker().wake_by_ref();
387 Poll::Pending
388 }
389 }
390 }
391
392 Sleep {
393 when: std::time::Instant::now() + duration,
394 }
395 .await
396}
397
398pub async fn timeout<F, T>(duration: Duration, future: F) -> Result<T, TimeoutError>
400where
401 F: Future<Output = T>,
402{
403 struct Timeout<F> {
404 future: Pin<Box<F>>,
405 deadline: Instant,
406 }
407
408 impl<F: Future> Future for Timeout<F> {
409 type Output = Result<F::Output, TimeoutError>;
410
411 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
412 if Instant::now() >= self.deadline {
413 return Poll::Ready(Err(TimeoutError));
414 }
415
416 match self.future.as_mut().poll(cx) {
417 Poll::Ready(v) => Poll::Ready(Ok(v)),
418 Poll::Pending => {
419 cx.waker().wake_by_ref();
420 Poll::Pending
421 }
422 }
423 }
424 }
425
426 Timeout {
427 future: Box::pin(future),
428 deadline: Instant::now() + duration,
429 }
430 .await
431}
432
433#[derive(Debug, Clone, Copy, PartialEq, Eq)]
435pub struct TimeoutError;
436
437impl std::fmt::Display for TimeoutError {
438 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
439 write!(f, "operation timed out")
440 }
441}
442
443impl std::error::Error for TimeoutError {}
444
445pub mod channel {
447 use std::sync::{Arc, Mutex, Condvar};
448 use std::collections::VecDeque;
449
450 pub fn bounded<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
452 let inner = Arc::new(ChannelInner {
453 queue: Mutex::new(VecDeque::with_capacity(capacity)),
454 condvar: Condvar::new(),
455 capacity,
456 closed: Mutex::new(false),
457 });
458 (Sender { inner: inner.clone() }, Receiver { inner })
459 }
460
461 pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
463 bounded(usize::MAX)
464 }
465
466 struct ChannelInner<T> {
467 queue: Mutex<VecDeque<T>>,
468 condvar: Condvar,
469 capacity: usize,
470 closed: Mutex<bool>,
471 }
472
473 pub struct Sender<T> {
475 inner: Arc<ChannelInner<T>>,
476 }
477
478 impl<T> Sender<T> {
479 pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
481 if *self.inner.closed.lock().unwrap() {
482 return Err(SendError(value));
483 }
484
485 loop {
486 let mut queue = self.inner.queue.lock().unwrap();
487 if queue.len() < self.inner.capacity {
488 queue.push_back(value);
489 self.inner.condvar.notify_one();
490 return Ok(());
491 }
492 drop(queue);
493 let queue = self.inner.queue.lock().unwrap();
494 let _guard = self.inner.condvar.wait(queue).unwrap();
495 }
496 }
497 }
498
499 impl<T> Clone for Sender<T> {
500 fn clone(&self) -> Self {
501 Self { inner: self.inner.clone() }
502 }
503 }
504
505 impl<T> Drop for Sender<T> {
506 fn drop(&mut self) {
507 if Arc::strong_count(&self.inner) == 2 {
508 *self.inner.closed.lock().unwrap() = true;
509 self.inner.condvar.notify_all();
510 }
511 }
512 }
513
514 pub struct Receiver<T> {
516 inner: Arc<ChannelInner<T>>,
517 }
518
519 impl<T> Receiver<T> {
520 pub async fn recv(&self) -> Option<T> {
522 loop {
523 let mut queue = self.inner.queue.lock().unwrap();
524 if let Some(value) = queue.pop_front() {
525 self.inner.condvar.notify_one();
526 return Some(value);
527 }
528 if *self.inner.closed.lock().unwrap() && queue.is_empty() {
529 return None;
530 }
531 drop(queue);
532 let queue = self.inner.queue.lock().unwrap();
533 let _guard = self.inner.condvar.wait(queue).unwrap();
534 }
535 }
536 }
537
538 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
540 pub struct SendError<T>(pub T);
541
542 impl<T> std::fmt::Display for SendError<T> {
543 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
544 write!(f, "channel closed")
545 }
546 }
547
548 impl<T: std::fmt::Debug> std::error::Error for SendError<T> {}
549}
550
551pub mod net {
553 use std::io;
554 use std::net::{TcpListener as StdListener, TcpStream as StdStream, SocketAddr};
555
556 pub struct TcpListener(StdListener);
557 pub struct TcpStream(StdStream);
558
559 impl TcpListener {
560 pub async fn bind(addr: SocketAddr) -> io::Result<Self> {
561 let listener = StdListener::bind(addr)?;
562 listener.set_nonblocking(true)?;
563 Ok(Self(listener))
564 }
565
566 pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
567 loop {
568 match self.0.accept() {
569 Ok((stream, addr)) => {
570 stream.set_nonblocking(true)?;
571 return Ok((TcpStream(stream), addr));
572 }
573 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
574 crate::sleep(std::time::Duration::from_millis(10)).await;
575 }
576 Err(e) => return Err(e),
577 }
578 }
579 }
580 }
581
582 impl TcpStream {
583 pub async fn connect(addr: SocketAddr) -> io::Result<Self> {
584 let stream = StdStream::connect(addr)?;
585 stream.set_nonblocking(true)?;
586 Ok(Self(stream))
587 }
588
589 pub fn into_std(self) -> StdStream {
590 self.0
591 }
592
593 pub fn as_std(&self) -> &StdStream {
594 &self.0
595 }
596
597 pub async fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
599 use std::io::Read;
600 loop {
601 match self.0.read(buf) {
602 Ok(n) => return Ok(n),
603 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
604 crate::sleep(std::time::Duration::from_millis(1)).await;
605 }
606 Err(e) => return Err(e),
607 }
608 }
609 }
610
611 pub async fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
613 use std::io::Write;
614 loop {
615 match self.0.write(buf) {
616 Ok(n) => return Ok(n),
617 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
618 crate::sleep(std::time::Duration::from_millis(1)).await;
619 }
620 Err(e) => return Err(e),
621 }
622 }
623 }
624
625 pub async fn write_all(&mut self, mut buf: &[u8]) -> io::Result<()> {
627 while !buf.is_empty() {
628 let n = self.write(buf).await?;
629 buf = &buf[n..];
630 }
631 Ok(())
632 }
633 }
634}
635
636pub mod io {
638 use std::io::{self, Read, Write};
639
640 pub async fn copy<R: Read, W: Write>(reader: &mut R, writer: &mut W) -> io::Result<u64> {
641 let mut buf = [0u8; 8192];
642 let mut total = 0u64;
643
644 loop {
645 match reader.read(&mut buf) {
646 Ok(0) => return Ok(total),
647 Ok(n) => {
648 writer.write_all(&buf[..n])?;
649 total += n as u64;
650 }
651 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
652 crate::sleep(std::time::Duration::from_millis(1)).await;
653 }
654 Err(e) => return Err(e),
655 }
656 }
657 }
658}