1pub mod metrics;
55pub mod tracing;
56pub mod health;
57pub mod autoscale;
58
59pub mod ai;
61pub mod digital_twin;
62pub mod edge;
63
64pub mod quantum;
66pub mod neuro;
67pub mod blockchain;
68pub mod crypto;
69pub mod genomic;
70
71pub use metrics::{Metrics, MetricsSnapshot};
72pub use tracing::{TraceContext, Tracer, Span, CompletedSpan};
73pub use health::{HealthCheck, HealthStatus, HealthReport};
74pub use autoscale::{AutoScaler, ScalingConfig, ScalingDecision, ResourceLimits};
75pub use ai::{WorkloadPredictor, AnomalyDetector, PerformanceOptimizer};
76pub use digital_twin::{DigitalTwin, TwinSnapshot, TwinUpdate};
77pub use edge::{EdgeManager, EdgeNode, DistributionStrategy, TaskDistribution};
78pub use quantum::{QuantumScheduler, SchedulingDecision, QuantumStats};
79pub use neuro::{NeuralNetwork, RecurrentNetwork, NetworkStats};
80pub use blockchain::{RuntimeBlockchain, Block, Transaction, TransactionType, ConsensusManager};
81pub use crypto::{CryptoService, SecureChannel, CryptoStats};
82pub use genomic::{GeneticOptimizer, Genome, GeneticStats};
83
84use std::future::Future;
85use std::pin::Pin;
86use std::task::{Context, Poll, Wake};
87use std::sync::{Arc, Mutex, Condvar, atomic::{AtomicBool, AtomicUsize, Ordering}};
88use std::collections::VecDeque;
89use std::thread;
90use std::time::{Duration, Instant};
91
92type Task = Pin<Box<dyn Future<Output = ()> + Send>>;
93
94#[derive(Clone, Debug)]
96pub struct RuntimeConfig {
97 pub num_threads: Option<usize>,
98 pub enable_autoscaling: bool,
99 pub scaling_config: ScalingConfig,
100 pub resource_limits: ResourceLimits,
101}
102
103impl Default for RuntimeConfig {
104 fn default() -> Self {
105 Self {
106 num_threads: None,
107 enable_autoscaling: false,
108 scaling_config: ScalingConfig::default(),
109 resource_limits: ResourceLimits::default(),
110 }
111 }
112}
113
114pub struct JoinHandle<T> {
116 result: Arc<Mutex<Option<T>>>,
117 completed: Arc<AtomicBool>,
118}
119
120impl<T> JoinHandle<T> {
121 pub async fn await_result(self) -> Option<T> {
123 while !self.completed.load(Ordering::Acquire) {
124 yield_now().await;
125 }
126 self.result.lock().unwrap().take()
127 }
128}
129
130pub struct Runtime {
131 queue: Arc<Mutex<VecDeque<Task>>>,
132 shutdown: Arc<AtomicBool>,
133 task_count: Arc<AtomicUsize>,
134 condvar: Arc<Condvar>,
135 metrics: Metrics,
136 health: HealthCheck,
137 tracer: Tracer,
138 #[allow(dead_code)]
139 autoscaler: Option<AutoScaler>,
140 resource_limits: ResourceLimits,
141}
142
143impl Runtime {
144 pub fn new() -> Self {
146 Self::with_config(RuntimeConfig::default())
147 }
148
149 pub fn with_config(config: RuntimeConfig) -> Self {
151 let metrics = Metrics::new();
152 let health = HealthCheck::new();
153 let tracer = Tracer::new();
154
155 let autoscaler = if config.enable_autoscaling {
156 Some(AutoScaler::new(config.scaling_config))
157 } else {
158 None
159 };
160
161 let num_threads = config.num_threads.unwrap_or_else(|| {
162 std::thread::available_parallelism()
163 .map(|n| n.get())
164 .unwrap_or(4)
165 });
166
167 metrics.set_thread_count(num_threads);
168
169 Self {
170 queue: Arc::new(Mutex::new(VecDeque::new())),
171 shutdown: Arc::new(AtomicBool::new(false)),
172 task_count: Arc::new(AtomicUsize::new(0)),
173 condvar: Arc::new(Condvar::new()),
174 metrics,
175 health,
176 tracer,
177 autoscaler,
178 resource_limits: config.resource_limits,
179 }
180 }
181
182 pub fn metrics(&self) -> &Metrics {
184 &self.metrics
185 }
186
187 pub fn health(&self) -> &HealthCheck {
189 &self.health
190 }
191
192 pub fn tracer(&self) -> &Tracer {
194 &self.tracer
195 }
196 pub fn task_count(&self) -> usize {
198 self.task_count.load(Ordering::Relaxed)
199 }
200
201 pub fn shutdown(&self) {
203 self.shutdown.store(true, Ordering::Release);
204 self.health.set_ready(false);
205 self.condvar.notify_all();
206 }
207
208 pub fn spawn<F>(&self, future: F)
210 where
211 F: Future<Output = ()> + Send + 'static,
212 {
213 let queue_len = {
215 let queue = self.queue.lock().unwrap();
216 queue.len()
217 };
218
219 if self.resource_limits.is_queue_size_exceeded(queue_len) {
220 self.health.add_check(
221 "queue_limit",
222 HealthStatus::Degraded,
223 format!("Queue size {} exceeds limit", queue_len),
224 );
225 return;
226 }
227
228 self.metrics.task_spawned();
229 self.task_count.fetch_add(1, Ordering::Relaxed);
230 let task_count = Arc::clone(&self.task_count);
231 let condvar = Arc::clone(&self.condvar);
232 let metrics = self.metrics.clone();
233 let start_time = Instant::now();
234
235 let wrapped = async move {
236 future.await;
237 let execution_time = start_time.elapsed();
238 metrics.task_completed(execution_time);
239 task_count.fetch_sub(1, Ordering::Relaxed);
240 condvar.notify_all();
241 };
242
243 let mut queue = self.queue.lock().unwrap();
244 queue.push_back(Box::pin(wrapped));
245 self.metrics.queue_length_changed(queue.len());
246 self.condvar.notify_one();
247 }
248
249 pub fn spawn_with_handle<F, T>(&self, future: F) -> JoinHandle<T>
251 where
252 F: Future<Output = T> + Send + 'static,
253 T: Send + 'static,
254 {
255 let result = Arc::new(Mutex::new(None));
256 let completed = Arc::new(AtomicBool::new(false));
257 let result_clone = Arc::clone(&result);
258 let completed_clone = Arc::clone(&completed);
259
260 let task = async move {
261 let output = future.await;
262 *result_clone.lock().unwrap() = Some(output);
263 completed_clone.store(true, Ordering::Release);
264 };
265
266 self.spawn(task);
267 JoinHandle { result, completed }
268 }
269
270 pub fn block_on<F, T>(&self, future: F) -> T
271 where
272 F: Future<Output = T> + Send + 'static,
273 T: Send + 'static,
274 {
275 let result = Arc::new(Mutex::new(None));
276 let result_clone = Arc::clone(&result);
277
278 let task = async move {
279 let output = future.await;
280 *result_clone.lock().unwrap() = Some(output);
281 };
282
283 self.spawn(Box::pin(task));
284 self.run();
285
286 Arc::try_unwrap(result)
287 .ok()
288 .and_then(|m| m.into_inner().ok())
289 .and_then(|opt| opt)
290 .expect("Task did not complete")
291 }
292
293 fn run(&self) {
294 let num_threads = std::thread::available_parallelism()
295 .map(|n| n.get())
296 .unwrap_or(4);
297
298 self.health.set_alive(true);
299 self.health.set_ready(true);
300
301 let mut handles = vec![];
302
303 for _thread_id in 0..num_threads {
304 let queue = Arc::clone(&self.queue);
305 let shutdown = Arc::clone(&self.shutdown);
306 let task_count = Arc::clone(&self.task_count);
307 let condvar = Arc::clone(&self.condvar);
308 let metrics = self.metrics.clone();
309 let health = self.health.clone();
310
311 let handle = thread::spawn(move || {
312 let waker = Arc::new(RuntimeWaker { condvar: Arc::clone(&condvar) }).into();
313
314 loop {
315 health.heartbeat();
316
317 if shutdown.load(Ordering::Acquire) && task_count.load(Ordering::Relaxed) == 0 {
318 break;
319 }
320
321 let task = {
322 let mut q = queue.lock().unwrap();
323 if q.is_empty() && !shutdown.load(Ordering::Acquire) {
324 metrics.thread_idle();
325 q = condvar.wait_timeout(q, Duration::from_millis(100)).unwrap().0;
326 metrics.thread_active();
327 }
328 let task = q.pop_front();
329 metrics.queue_length_changed(q.len());
330 task
331 };
332
333 match task {
334 Some(mut task) => {
335 metrics.thread_active();
336 let mut context = Context::from_waker(&waker);
337 match task.as_mut().poll(&mut context) {
338 Poll::Ready(()) => {},
339 Poll::Pending => {
340 let mut q = queue.lock().unwrap();
341 q.push_back(task);
342 metrics.queue_length_changed(q.len());
343 }
344 }
345 }
346 None if shutdown.load(Ordering::Acquire) => break,
347 None => {}
348 }
349 }
350 });
351 handles.push(handle);
352 }
353
354 for handle in handles {
355 let _ = handle.join();
356 }
357
358 self.health.set_alive(false);
359 }
360}
361
362impl Default for Runtime {
363 fn default() -> Self {
364 Self::new()
365 }
366}
367
368struct RuntimeWaker {
369 condvar: Arc<Condvar>,
370}
371
372impl Wake for RuntimeWaker {
373 fn wake(self: Arc<Self>) {
374 self.condvar.notify_one();
375 }
376
377 fn wake_by_ref(self: &Arc<Self>) {
378 self.condvar.notify_one();
379 }
380}
381
382pub fn spawn<F>(future: F)
384where
385 F: Future<Output = ()> + Send + 'static,
386{
387 RUNTIME.with(|rt| {
388 rt.borrow().spawn(future);
389 });
390}
391
392thread_local! {
393 static RUNTIME: std::cell::RefCell<Runtime> = std::cell::RefCell::new(Runtime::new());
394}
395
396#[macro_export]
398macro_rules! main {
399 ($($body:tt)*) => {
400 fn main() {
401 let rt = $crate::Runtime::new();
402 rt.block_on(async { $($body)* });
403 }
404 };
405}
406
407pub async fn yield_now() {
409 struct YieldNow {
410 yielded: bool,
411 }
412
413 impl Future for YieldNow {
414 type Output = ();
415
416 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
417 if self.yielded {
418 Poll::Ready(())
419 } else {
420 self.yielded = true;
421 cx.waker().wake_by_ref();
422 Poll::Pending
423 }
424 }
425 }
426
427 YieldNow { yielded: false }.await
428}
429
430pub async fn sleep(duration: Duration) {
432 struct Sleep {
433 when: std::time::Instant,
434 }
435
436 impl Future for Sleep {
437 type Output = ();
438
439 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
440 if std::time::Instant::now() >= self.when {
441 Poll::Ready(())
442 } else {
443 cx.waker().wake_by_ref();
444 Poll::Pending
445 }
446 }
447 }
448
449 Sleep {
450 when: std::time::Instant::now() + duration,
451 }
452 .await
453}
454
455pub async fn timeout<F, T>(duration: Duration, future: F) -> Result<T, TimeoutError>
457where
458 F: Future<Output = T>,
459{
460 struct Timeout<F> {
461 future: Pin<Box<F>>,
462 deadline: Instant,
463 }
464
465 impl<F: Future> Future for Timeout<F> {
466 type Output = Result<F::Output, TimeoutError>;
467
468 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
469 if Instant::now() >= self.deadline {
470 return Poll::Ready(Err(TimeoutError));
471 }
472
473 match self.future.as_mut().poll(cx) {
474 Poll::Ready(v) => Poll::Ready(Ok(v)),
475 Poll::Pending => {
476 cx.waker().wake_by_ref();
477 Poll::Pending
478 }
479 }
480 }
481 }
482
483 Timeout {
484 future: Box::pin(future),
485 deadline: Instant::now() + duration,
486 }
487 .await
488}
489
490#[derive(Debug, Clone, Copy, PartialEq, Eq)]
492pub struct TimeoutError;
493
494impl std::fmt::Display for TimeoutError {
495 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
496 write!(f, "operation timed out")
497 }
498}
499
500impl std::error::Error for TimeoutError {}
501
502pub mod channel {
504 use std::sync::{Arc, Mutex, Condvar};
505 use std::collections::VecDeque;
506
507 pub fn bounded<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
509 let inner = Arc::new(ChannelInner {
510 queue: Mutex::new(VecDeque::with_capacity(capacity)),
511 condvar: Condvar::new(),
512 capacity,
513 closed: Mutex::new(false),
514 });
515 (Sender { inner: inner.clone() }, Receiver { inner })
516 }
517
518 pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
520 bounded(usize::MAX)
521 }
522
523 struct ChannelInner<T> {
524 queue: Mutex<VecDeque<T>>,
525 condvar: Condvar,
526 capacity: usize,
527 closed: Mutex<bool>,
528 }
529
530 pub struct Sender<T> {
532 inner: Arc<ChannelInner<T>>,
533 }
534
535 impl<T> Sender<T> {
536 pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
538 if *self.inner.closed.lock().unwrap() {
539 return Err(SendError(value));
540 }
541
542 loop {
543 let mut queue = self.inner.queue.lock().unwrap();
544 if queue.len() < self.inner.capacity {
545 queue.push_back(value);
546 self.inner.condvar.notify_one();
547 return Ok(());
548 }
549 drop(queue);
550 let queue = self.inner.queue.lock().unwrap();
551 let _guard = self.inner.condvar.wait(queue).unwrap();
552 }
553 }
554 }
555
556 impl<T> Clone for Sender<T> {
557 fn clone(&self) -> Self {
558 Self { inner: self.inner.clone() }
559 }
560 }
561
562 impl<T> Drop for Sender<T> {
563 fn drop(&mut self) {
564 if Arc::strong_count(&self.inner) == 2 {
565 *self.inner.closed.lock().unwrap() = true;
566 self.inner.condvar.notify_all();
567 }
568 }
569 }
570
571 pub struct Receiver<T> {
573 inner: Arc<ChannelInner<T>>,
574 }
575
576 impl<T> Receiver<T> {
577 pub async fn recv(&self) -> Option<T> {
579 loop {
580 let mut queue = self.inner.queue.lock().unwrap();
581 if let Some(value) = queue.pop_front() {
582 self.inner.condvar.notify_one();
583 return Some(value);
584 }
585 if *self.inner.closed.lock().unwrap() && queue.is_empty() {
586 return None;
587 }
588 drop(queue);
589 let queue = self.inner.queue.lock().unwrap();
590 let _guard = self.inner.condvar.wait(queue).unwrap();
591 }
592 }
593 }
594
595 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
597 pub struct SendError<T>(pub T);
598
599 impl<T> std::fmt::Display for SendError<T> {
600 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
601 write!(f, "channel closed")
602 }
603 }
604
605 impl<T: std::fmt::Debug> std::error::Error for SendError<T> {}
606}
607
608pub mod net {
610 use std::io;
611 use std::net::{TcpListener as StdListener, TcpStream as StdStream, SocketAddr};
612
613 pub struct TcpListener(StdListener);
614 pub struct TcpStream(StdStream);
615
616 impl TcpListener {
617 pub async fn bind(addr: SocketAddr) -> io::Result<Self> {
618 let listener = StdListener::bind(addr)?;
619 listener.set_nonblocking(true)?;
620 Ok(Self(listener))
621 }
622
623 pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
624 loop {
625 match self.0.accept() {
626 Ok((stream, addr)) => {
627 stream.set_nonblocking(true)?;
628 return Ok((TcpStream(stream), addr));
629 }
630 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
631 crate::sleep(std::time::Duration::from_millis(10)).await;
632 }
633 Err(e) => return Err(e),
634 }
635 }
636 }
637 }
638
639 impl TcpStream {
640 pub async fn connect(addr: SocketAddr) -> io::Result<Self> {
641 let stream = StdStream::connect(addr)?;
642 stream.set_nonblocking(true)?;
643 Ok(Self(stream))
644 }
645
646 pub fn into_std(self) -> StdStream {
647 self.0
648 }
649
650 pub fn as_std(&self) -> &StdStream {
651 &self.0
652 }
653
654 pub async fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
656 use std::io::Read;
657 loop {
658 match self.0.read(buf) {
659 Ok(n) => return Ok(n),
660 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
661 crate::sleep(std::time::Duration::from_millis(1)).await;
662 }
663 Err(e) => return Err(e),
664 }
665 }
666 }
667
668 pub async fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
670 use std::io::Write;
671 loop {
672 match self.0.write(buf) {
673 Ok(n) => return Ok(n),
674 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
675 crate::sleep(std::time::Duration::from_millis(1)).await;
676 }
677 Err(e) => return Err(e),
678 }
679 }
680 }
681
682 pub async fn write_all(&mut self, mut buf: &[u8]) -> io::Result<()> {
684 while !buf.is_empty() {
685 let n = self.write(buf).await?;
686 buf = &buf[n..];
687 }
688 Ok(())
689 }
690 }
691}
692
693pub mod io {
695 use std::io::{self, Read, Write};
696
697 pub async fn copy<R: Read, W: Write>(reader: &mut R, writer: &mut W) -> io::Result<u64> {
698 let mut buf = [0u8; 8192];
699 let mut total = 0u64;
700
701 loop {
702 match reader.read(&mut buf) {
703 Ok(0) => return Ok(total),
704 Ok(n) => {
705 writer.write_all(&buf[..n])?;
706 total += n as u64;
707 }
708 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
709 crate::sleep(std::time::Duration::from_millis(1)).await;
710 }
711 Err(e) => return Err(e),
712 }
713 }
714 }
715}