Skip to main content

assemblylift_core_event/
threader.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::sync::Mutex;
4
5use crossbeam_utils::atomic::AtomicCell;
6use tokio::runtime::Runtime;
7
8use assemblylift_core_event_common::constants::EVENT_BUFFER_SIZE_BYTES;
9use assemblylift_core_event_common::EventMemoryDocument;
10
11lazy_static! {
12    static ref EVENT_MEMORY: Mutex<EventMemory> = Mutex::new(EventMemory::new());
13}
14
15pub struct Threader {
16    runtime: Runtime,
17}
18
19impl Threader {
20    pub fn new() -> Self {
21        use tokio::runtime::Builder;
22
23        let runtime = Builder::new()
24            .threaded_scheduler()
25            .enable_all()
26            .build()
27            .unwrap();
28
29        Threader { runtime }
30    }
31
32    pub fn next_event_id(&mut self) -> Option<u32> {
33        match EVENT_MEMORY.lock() {
34            Ok(mut memory) => memory.next_id(),
35            Err(_) => None,
36        }
37    }
38
39    pub fn is_event_ready(&self, event_id: u32) -> bool {
40        match EVENT_MEMORY.lock() {
41            Ok(memory) => memory.is_ready(event_id),
42            Err(_) => false,
43        }
44    }
45
46    pub fn get_event_memory_document(&mut self, event_id: u32) -> Option<EventMemoryDocument> {
47        println!("TRACE: get_event_memory_document event_id={}", event_id);
48        match EVENT_MEMORY.lock() {
49            Ok(memory) => {
50                println!(
51                    "DEBUG: num keys in document map: {}",
52                    memory.document_map.keys().len()
53                );
54                match memory.document_map.get(&event_id) {
55                    Some(doc) => Some(doc.clone()),
56                    None => None,
57                }
58            }
59            Err(_) => None,
60        }
61    }
62
63    pub fn spawn_with_event_id(
64        &mut self,
65        writer: *const AtomicCell<u8>,
66        future: impl Future<Output = Vec<u8>> + 'static + Send,
67        event_id: u32,
68    ) {
69        println!("TRACE: spawn_with_event_id");
70
71        // FIXME this is a kludge -- I feel like the raw pointer shouldn't be needed
72        let slc = unsafe { std::slice::from_raw_parts(writer, EVENT_BUFFER_SIZE_BYTES) };
73
74        println!("TRACE: spawning on tokio runtime");
75        self.runtime.spawn(async move {
76            println!("TRACE: awaiting IO...");
77            let serialized = future.await;
78            println!("TRACE: IO complete");
79
80            EVENT_MEMORY
81                .lock()
82                .unwrap()
83                .write_vec_at(slc, serialized, event_id);
84        });
85        println!("TRACE: spawned");
86    }
87
88    pub fn __reset_memory() {
89        if let Ok(mut memory) = EVENT_MEMORY.lock() {
90            memory.__reset();
91        }
92    }
93}
94
95struct EventMemory {
96    _next_id: u32,
97    document_map: HashMap<u32, EventMemoryDocument>,
98    event_map: HashMap<u32, bool>,
99}
100
101impl EventMemory {
102    pub fn new() -> Self {
103        EventMemory {
104            _next_id: 1, // id 0 is reserved (null)
105            document_map: Default::default(),
106            event_map: Default::default(),
107        }
108    }
109
110    pub fn next_id(&mut self) -> Option<u32> {
111        let next_id = self._next_id.clone();
112        self._next_id += 1;
113
114        self.event_map.insert(next_id, false);
115
116        Some(next_id)
117    }
118
119    pub fn is_ready(&self, event_id: u32) -> bool {
120        *self.event_map.get(&event_id).unwrap()
121    }
122
123    pub fn write_vec_at(&mut self, writer: &[AtomicCell<u8>], vec: Vec<u8>, event_id: u32) {
124        println!("TRACE: write_vec_at");
125
126        // Serialize the response
127        let response_len = vec.len();
128        println!("DEBUG: response is {} bytes", response_len);
129
130        let start = self.find_with_length(response_len);
131        let end = start + response_len;
132        for i in start..end {
133            writer[i].store(vec[i - start]);
134        }
135        println!("TRACE: stored response");
136
137        // Update document map
138        self.document_map
139            .insert(event_id, EventMemoryDocument { start, length: end });
140        println!(
141            "TRACE: updated document map id={} start={} end={}",
142            event_id, start, end
143        );
144
145        // Update event status table
146        self.event_map.insert(event_id, true);
147    }
148
149    pub fn __reset(&mut self) {
150        self._next_id = 1;
151        self.document_map = Default::default();
152        self.event_map = Default::default();
153    }
154
155    fn find_with_length(&self, _length: usize) -> usize {
156        // TODO this less stupidly
157        let mut max_end = 0usize;
158        for doc in self.document_map.values().into_iter() {
159            let next_end = doc.start + doc.length;
160            if next_end > max_end {
161                max_end = next_end
162            }
163        }
164        max_end
165    }
166}