1use std::{
2 time::Duration,
3 sync::{atomic::{AtomicU64, Ordering}},
4};
5use async_std::{
6 sync::Arc,
7 task,
8 future
9};
10use cyfs_base::*;
11
12use crate::{
13 types::*,
14 stack::{WeakStack, Stack},
15 utils::*
16};
17use super::{
18 channel::{self, ChannelManager},
19 chunk::{self, ChunkManager, ChunkReader},
20 event::*,
21 root::RootTask,
22};
23
24#[derive(Clone)]
25pub struct Config {
26 pub atomic_interval: Duration,
27 pub schedule_interval: Duration,
28 pub channel: channel::Config,
29 pub chunk: chunk::Config
30}
31
32
33struct StackImpl {
34 stack: WeakStack,
35 last_schedule: AtomicU64,
36 chunk_manager: ChunkManager,
37 channel_manager: ChannelManager,
38 event_handler: Box<dyn NdnEventHandler>,
39 root_task: RootTask,
40}
41
42#[derive(Clone)]
43pub struct NdnStack(Arc<StackImpl>);
44
45impl NdnStack {
46 pub(crate) fn open(
47 stack: WeakStack,
48 store: Option<Box<dyn ChunkReader>>,
49 event_handler: Option<Box<dyn NdnEventHandler>>,
50 ) -> Self {
51 let store = store.unwrap_or(Box::new(MemChunkStore::new()));
52 let event_handler = event_handler.unwrap_or(Box::new(DefaultNdnEventHandler::new()));
53 let strong_stack = Stack::from(&stack);
54
55 Self(Arc::new(StackImpl {
56 stack: stack.clone(),
57 last_schedule: AtomicU64::new(0),
58 chunk_manager: ChunkManager::new(stack.clone(), store),
59 channel_manager: ChannelManager::new(stack.clone()),
60 event_handler,
61 root_task: RootTask::new(100000, strong_stack.config().ndn.channel.history_speed.clone()),
62 }))
63 }
64
65 pub(crate) fn start(&self) {
66 let stack = Stack::from(&self.0.stack);
67 let atomic_interval = stack.config().ndn.atomic_interval;
68 {
69 let ndn = self.clone();
70 task::spawn(async move {
71 loop {
72 let start = bucky_time_now();
73 ndn.on_time_escape(start);
74 let end = bucky_time_now();
75 let escaped = Duration::from_micros(end - start);
76 if escaped < atomic_interval {
77 let _ = future::timeout(atomic_interval - escaped, future::pending::<()>()).await;
78 }
79 }
80 });
81 }
82 }
83
84 fn on_time_escape(&self, now: Timestamp) {
85 let stack = Stack::from(&self.0.stack);
86 let last_schedule = self.0.last_schedule.load(Ordering::SeqCst);
87 if now > last_schedule
88 && Duration::from_micros(now - last_schedule) > stack.config().ndn.schedule_interval {
89 self.channel_manager().on_schedule(now);
90 self.root_task().on_schedule(now);
91 self.chunk_manager().on_schedule(now);
92 self.0.last_schedule.store(now, Ordering::SeqCst);
93 }
94 self.channel_manager().on_time_escape(now);
95 }
96
97 pub fn chunk_manager(&self) -> &ChunkManager {
98 &self.0.chunk_manager
99 }
100
101 pub fn root_task(&self) -> &RootTask {
102 &self.0.root_task
103 }
104
105 pub fn channel_manager(&self) -> &ChannelManager {
106 &self.0.channel_manager
107 }
108
109 pub(super) fn event_handler(&self) -> &dyn NdnEventHandler {
110 self.0.event_handler.as_ref()
111 }
112
113
114}
115