cyfs_bdt/ndn/
stack.rs

1use std::{
2    time::Duration, 
3    sync::{atomic::{AtomicU64, Ordering}},
4};
5use async_std::{
6    sync::Arc, 
7    task,
8    future
9};
10use cyfs_base::*;
11
12use crate::{
13    types::*, 
14    stack::{WeakStack, Stack}, 
15    utils::*
16};
17use super::{
18    channel::{self, ChannelManager}, 
19    chunk::{self, ChunkManager, ChunkReader}, 
20    event::*, 
21    root::RootTask,
22};
23
24#[derive(Clone)]
25pub struct Config {
26    pub atomic_interval: Duration,  
27    pub schedule_interval: Duration, 
28    pub channel: channel::Config,
29    pub chunk: chunk::Config
30}
31
32
33struct StackImpl {
34    stack: WeakStack, 
35    last_schedule: AtomicU64, 
36    chunk_manager: ChunkManager, 
37    channel_manager: ChannelManager, 
38    event_handler: Box<dyn NdnEventHandler>, 
39    root_task: RootTask,
40}
41
42#[derive(Clone)]
43pub struct NdnStack(Arc<StackImpl>);
44
45impl NdnStack {
46    pub(crate) fn open(
47        stack: WeakStack, 
48        store: Option<Box<dyn ChunkReader>>, 
49        event_handler: Option<Box<dyn NdnEventHandler>>, 
50    ) -> Self {
51        let store = store.unwrap_or(Box::new(MemChunkStore::new()));
52        let event_handler = event_handler.unwrap_or(Box::new(DefaultNdnEventHandler::new()));
53        let strong_stack = Stack::from(&stack);
54
55        Self(Arc::new(StackImpl {
56            stack: stack.clone(), 
57            last_schedule: AtomicU64::new(0), 
58            chunk_manager: ChunkManager::new(stack.clone(), store), 
59            channel_manager: ChannelManager::new(stack.clone()), 
60            event_handler, 
61            root_task: RootTask::new(100000, strong_stack.config().ndn.channel.history_speed.clone()),
62        }))
63    }
64
65    pub(crate) fn start(&self) {
66        let stack = Stack::from(&self.0.stack);
67        let atomic_interval = stack.config().ndn.atomic_interval;
68        {
69            let ndn = self.clone();
70            task::spawn(async move {
71                loop {
72                    let start = bucky_time_now();
73                    ndn.on_time_escape(start);
74                    let end = bucky_time_now();
75                    let escaped = Duration::from_micros(end - start);
76                    if escaped < atomic_interval {
77                        let _ = future::timeout(atomic_interval - escaped, future::pending::<()>()).await;
78                    }
79                }
80            });
81        }   
82    }
83
84    fn on_time_escape(&self, now: Timestamp) {
85        let stack = Stack::from(&self.0.stack);
86        let last_schedule = self.0.last_schedule.load(Ordering::SeqCst);
87        if now > last_schedule 
88            && Duration::from_micros(now - last_schedule) > stack.config().ndn.schedule_interval {
89            self.channel_manager().on_schedule(now);
90            self.root_task().on_schedule(now);
91            self.chunk_manager().on_schedule(now);
92            self.0.last_schedule.store(now, Ordering::SeqCst);
93        }
94        self.channel_manager().on_time_escape(now);
95    }
96    
97    pub fn chunk_manager(&self) -> &ChunkManager {
98        &self.0.chunk_manager
99    }
100
101    pub fn root_task(&self) -> &RootTask {
102        &self.0.root_task
103    }
104
105    pub fn channel_manager(&self) -> &ChannelManager {
106        &self.0.channel_manager
107    }
108
109    pub(super) fn event_handler(&self) -> &dyn NdnEventHandler {
110        self.0.event_handler.as_ref()
111    }
112
113    
114}
115