assemblylift_core_event/
threader.rs1use std::collections::HashMap;
2use std::future::Future;
3use std::sync::Mutex;
4
5use crossbeam_utils::atomic::AtomicCell;
6use tokio::runtime::Runtime;
7
8use assemblylift_core_event_common::constants::EVENT_BUFFER_SIZE_BYTES;
9use assemblylift_core_event_common::EventMemoryDocument;
10
11lazy_static! {
12 static ref EVENT_MEMORY: Mutex<EventMemory> = Mutex::new(EventMemory::new());
13}
14
15pub struct Threader {
16 runtime: Runtime,
17}
18
19impl Threader {
20 pub fn new() -> Self {
21 use tokio::runtime::Builder;
22
23 let runtime = Builder::new()
24 .threaded_scheduler()
25 .enable_all()
26 .build()
27 .unwrap();
28
29 Threader { runtime }
30 }
31
32 pub fn next_event_id(&mut self) -> Option<u32> {
33 match EVENT_MEMORY.lock() {
34 Ok(mut memory) => memory.next_id(),
35 Err(_) => None,
36 }
37 }
38
39 pub fn is_event_ready(&self, event_id: u32) -> bool {
40 match EVENT_MEMORY.lock() {
41 Ok(memory) => memory.is_ready(event_id),
42 Err(_) => false,
43 }
44 }
45
46 pub fn get_event_memory_document(&mut self, event_id: u32) -> Option<EventMemoryDocument> {
47 println!("TRACE: get_event_memory_document event_id={}", event_id);
48 match EVENT_MEMORY.lock() {
49 Ok(memory) => {
50 println!(
51 "DEBUG: num keys in document map: {}",
52 memory.document_map.keys().len()
53 );
54 match memory.document_map.get(&event_id) {
55 Some(doc) => Some(doc.clone()),
56 None => None,
57 }
58 }
59 Err(_) => None,
60 }
61 }
62
63 pub fn spawn_with_event_id(
64 &mut self,
65 writer: *const AtomicCell<u8>,
66 future: impl Future<Output = Vec<u8>> + 'static + Send,
67 event_id: u32,
68 ) {
69 println!("TRACE: spawn_with_event_id");
70
71 let slc = unsafe { std::slice::from_raw_parts(writer, EVENT_BUFFER_SIZE_BYTES) };
73
74 println!("TRACE: spawning on tokio runtime");
75 self.runtime.spawn(async move {
76 println!("TRACE: awaiting IO...");
77 let serialized = future.await;
78 println!("TRACE: IO complete");
79
80 EVENT_MEMORY
81 .lock()
82 .unwrap()
83 .write_vec_at(slc, serialized, event_id);
84 });
85 println!("TRACE: spawned");
86 }
87
88 pub fn __reset_memory() {
89 if let Ok(mut memory) = EVENT_MEMORY.lock() {
90 memory.__reset();
91 }
92 }
93}
94
95struct EventMemory {
96 _next_id: u32,
97 document_map: HashMap<u32, EventMemoryDocument>,
98 event_map: HashMap<u32, bool>,
99}
100
101impl EventMemory {
102 pub fn new() -> Self {
103 EventMemory {
104 _next_id: 1, document_map: Default::default(),
106 event_map: Default::default(),
107 }
108 }
109
110 pub fn next_id(&mut self) -> Option<u32> {
111 let next_id = self._next_id.clone();
112 self._next_id += 1;
113
114 self.event_map.insert(next_id, false);
115
116 Some(next_id)
117 }
118
119 pub fn is_ready(&self, event_id: u32) -> bool {
120 *self.event_map.get(&event_id).unwrap()
121 }
122
123 pub fn write_vec_at(&mut self, writer: &[AtomicCell<u8>], vec: Vec<u8>, event_id: u32) {
124 println!("TRACE: write_vec_at");
125
126 let response_len = vec.len();
128 println!("DEBUG: response is {} bytes", response_len);
129
130 let start = self.find_with_length(response_len);
131 let end = start + response_len;
132 for i in start..end {
133 writer[i].store(vec[i - start]);
134 }
135 println!("TRACE: stored response");
136
137 self.document_map
139 .insert(event_id, EventMemoryDocument { start, length: end });
140 println!(
141 "TRACE: updated document map id={} start={} end={}",
142 event_id, start, end
143 );
144
145 self.event_map.insert(event_id, true);
147 }
148
149 pub fn __reset(&mut self) {
150 self._next_id = 1;
151 self.document_map = Default::default();
152 self.event_map = Default::default();
153 }
154
155 fn find_with_length(&self, _length: usize) -> usize {
156 let mut max_end = 0usize;
158 for doc in self.document_map.values().into_iter() {
159 let next_end = doc.start + doc.length;
160 if next_end > max_end {
161 max_end = next_end
162 }
163 }
164 max_end
165 }
166}