sparkles/
global_storage.rs

1//! Single global storage for sparkles events
2//! All evens are being flushed into GLOBAL_STORAGE, and then head towards transport abstraction (UDP/TCP/file).
3
4use std::io::Read;
5use std::{mem, thread};
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::thread::{JoinHandle};
8use std::time::{Duration, Instant};
9use log::{debug, error, trace, warn};
10use parking_lot::{Condvar, Mutex};
11use ringbuf::traits::{Consumer, Observer, Producer};
12use sparkles_core::{Timestamp, TimestampProvider};
13use sparkles_core::protocol::headers::{LocalPacketHeader, SparklesMachineInfo};
14use sparkles_core::protocol::packets::{send_failed_pages, send_graceful_shutdown, send_machine_info, send_timestamp_freq, send_trace_data};
15use sparkles_core::protocol::sender::{ConfiguredSender, Sender, SenderChain};
16use crate::config::SparklesConfig;
17use crate::{flush_thread_local, on_client_connect, GLOBAL_FLUSHING_RUNNING, THREAD_LOCAL_NOTIFICATION};
18use crate::sender::file_sender::FileSender;
19use crate::thread_local_storage::set_local_storage_config;
20
21pub static GLOBAL_STORAGE: Mutex<Option<GlobalStorage>> = Mutex::new(None);
22static FINALIZE_STARTED: AtomicBool = AtomicBool::new(false);
23static SENDER_THREAD_ITERATION: Condvar = Condvar::new();
24
25pub struct GlobalStorage {
26    config: SparklesConfig,
27    inner: ringbuf::LocalRb<ringbuf::storage::Heap<u8>>,
28    sending_thread: Option<JoinHandle<()>>,
29
30    skipped_msr_pages_headers: Vec<LocalPacketHeader>,
31}
32
33impl GlobalStorage {
34    /// Create new global storage with given config and spawn sending thread
35    pub fn new(config: SparklesConfig) -> Self {
36        // Set local storage config
37        set_local_storage_config(config.local_storage_config);
38
39        let jh = spawn_sending_task(config.clone());
40
41        let global_capacity = config.global_capacity;
42        Self {
43            config,
44            inner: ringbuf::LocalRb::new(global_capacity),
45            sending_thread: Some(jh),
46
47            skipped_msr_pages_headers: Vec::new(),
48        }
49    }
50
51
52    /// Called by thread local storage to put its contents into global storage
53    pub fn push_buf(&mut self, header: &LocalPacketHeader, buf: &[u8]) {
54        // info!("Got new local buffer. start: {}, end: {}", header.start_timestamp, header.end_timestamp);
55        let header = bincode::encode_to_vec(header, bincode::config::standard()).unwrap();
56        let header_len = (header.len() as u64).to_be_bytes();
57        let bufer_len = (buf.len() as u64).to_be_bytes();
58
59        self.inner.push_slice(&header_len);
60        self.inner.push_slice(&header);
61        self.inner.push_slice(&bufer_len);
62        self.inner.push_slice(buf);
63
64        if self.inner.occupied_len() > (self.config.cleanup_threshold * self.config.global_capacity as f64) as usize {
65            warn!("[sparkles] BUFFER FULL! starting cleanup..");
66            let mut header_len = [0u8; 8];
67            let mut buf_len = [0u8; 8];
68            let mut header_bytes = Vec::new();
69            while self.inner.occupied_len() > (self.config.cleanup_bottom_threshold * self.config.global_capacity as f64) as usize {
70                self.inner.read_exact(&mut header_len).unwrap();
71                let header_len = u64::from_be_bytes(header_len) as usize;
72
73                header_bytes.resize(header_len, 0);
74                self.inner.read_exact(&mut header_bytes).unwrap();
75                let (header, _) = bincode::decode_from_slice(&header_bytes, bincode::config::standard()).unwrap();
76
77                self.inner.read_exact(&mut buf_len).unwrap();
78                let buf_len = u64::from_be_bytes(buf_len) as usize;
79                self.inner.skip(buf_len);
80                self.skipped_msr_pages_headers.push(header);
81            }
82        }
83    }
84
85    fn take_failed_pages(&mut self) -> Vec<LocalPacketHeader> {
86        mem::take(&mut self.skipped_msr_pages_headers)
87    }
88
89    fn try_take_buf(&mut self, take_everything: bool) -> Option<(Vec<u8>, Vec<u8>)> {
90        let threshold = if take_everything {
91            0
92        } else {
93            self.config.flush_threshold
94        };
95        if self.inner.occupied_len() > threshold {
96            use crate as sparkles;
97            #[cfg(feature="self-tracing")]
98            let g = sparkles_macro::range_event_start!("[internal] Taking stored events");
99            
100            debug!("[sparkles] Flushing..");
101            let slices = self.inner.as_slices();
102            let slices = (slices.0.to_vec(), slices.1.to_vec());
103            self.inner.clear();
104            Some(slices)
105        }
106        else {
107            None
108        }
109    }
110
111    fn take_jh(&mut self) -> Option<JoinHandle<()>> {
112        self.sending_thread.take()
113    }
114    
115    pub fn check_notify(&self) {
116        let thr = self.config.flush_threshold;
117        if self.inner.occupied_len() > thr {
118            SENDER_THREAD_ITERATION.notify_one();
119        }
120    }
121}
122
123fn spawn_sending_task(config: SparklesConfig) -> JoinHandle<()> {
124    thread::Builder::new().name("[Sparkles] Sender thread".to_string()).spawn(move || {
125        debug!("[sparkles] Flush thread started!");
126
127        let mut sender_chain = SenderChain::default();
128        if let Some(file_sender_config) = config.file_sender_config.as_ref() {
129            if let Some(sender) = FileSender::new(file_sender_config) {
130                sender_chain.with_sender(sender);
131            }
132            else {
133                warn!("[sparkles] Failed to create file sender!");
134            }
135        }
136        #[cfg(feature = "udp-streaming")]
137        if let Some(udp_sender_config) = config.udp_sender_config.as_ref() {
138            if let Some(sender) = crate::sender::udp_sender::UdpSender::new(udp_sender_config) {
139                sender_chain.with_sender(sender);
140            }
141            else {
142                warn!("[sparkles] Failed to create UDP sender!");
143                on_client_connect();
144            }
145        }
146        else {
147            on_client_connect();
148        }
149        #[cfg(not(feature = "udp-streaming"))]
150        on_client_connect();
151
152        let process_name = std::env::current_exe().unwrap().file_name().unwrap().to_str().unwrap().to_string();
153        let pid = std::process::id();
154
155        let mut freq_detector = TimestampFreqDetector::start(Duration::from_millis(100));
156
157        let info_header = SparklesMachineInfo::new(process_name, pid);
158        send_machine_info(&mut sender_chain, info_header.clone());
159
160        thread::sleep(Duration::from_millis(1));
161
162        let (ticks_per_sec, cur_tm) = freq_detector.next_forced();
163        send_timestamp_freq(&mut sender_chain, ticks_per_sec, cur_tm);
164
165        let mut last_sender_poll_tm: Option<Instant> = None;
166        
167        let tmp_mutex = Mutex::new(());
168        loop {
169            use crate as sparkles;
170            
171            if last_sender_poll_tm.is_none_or(|tm| tm.elapsed() > Duration::from_millis(200)) {
172                sender_chain.poll();
173                last_sender_poll_tm = Some(Instant::now());
174            }
175            
176            if sender_chain.take_tm_freq_requested() {
177                THREAD_LOCAL_NOTIFICATION.fetch_add(1, Ordering::Relaxed);
178                
179                let (ticks_per_sec, cur_tm) = freq_detector.next_forced();
180                send_timestamp_freq(&mut sender_chain, ticks_per_sec, cur_tm);
181                send_machine_info(&mut sender_chain, info_header.clone());
182            }
183            else if let Some((ticks_per_sec, cur_tm)) = freq_detector.next() {
184                send_timestamp_freq(&mut sender_chain, ticks_per_sec, cur_tm);
185            }
186
187            // Read value before flushing
188            let is_finalizing = FINALIZE_STARTED.load(Ordering::Relaxed);
189            if is_finalizing {
190                debug!("[sparkles] Finalize detected!");
191            }
192
193            // this thing should be fast
194            let (slices, failed_pages) = {
195                #[cfg(feature="self-tracing")]
196                if is_finalizing {
197                    sparkles_macro::instant_event!("[internal] Finalizing");
198                    flush_thread_local();
199                }
200
201                if let Some(global_storage) = GLOBAL_STORAGE.lock().as_mut() {
202                    let failed_pages = global_storage.take_failed_pages();
203                    
204                    GLOBAL_FLUSHING_RUNNING.store(true, Ordering::Relaxed);
205                    (global_storage.try_take_buf(is_finalizing), failed_pages)
206                }
207                else {
208                    (None, Vec::new())
209                }
210            };
211            GLOBAL_FLUSHING_RUNNING.store(false, Ordering::Relaxed);
212
213            // handle buffers
214            if let Some((slice1, slice2)) = slices {
215                #[cfg(feature="self-tracing")]
216                let grd = crate::range_event_start(crate::calculate_hash("[internal] Send data bytes"), "[internal] Send data bytes");
217                send_trace_data(&mut sender_chain, &slice1, &slice2);
218            }
219
220            // handle failed pages
221            if !failed_pages.is_empty() {
222                trace!("Sending {} failed pages", failed_pages.len());
223                send_failed_pages(&mut sender_chain, &failed_pages)
224            }
225
226            if is_finalizing {
227                debug!("[internal] Finalize in process...");
228                send_graceful_shutdown(&mut sender_chain);
229                break;
230            }
231            
232            if !FINALIZE_STARTED.load(Ordering::Relaxed) {
233                let mut mutex = tmp_mutex.lock();
234                #[cfg(feature="self-tracing")]
235                let g = sparkles_macro::range_event_start!("[internal] Waiting for signal");
236                if SENDER_THREAD_ITERATION.wait_for(&mut mutex, Duration::from_millis(50)).timed_out() {
237                    #[cfg(feature="self-tracing")]
238                    sparkles_macro::range_event_end!(g, "Timeout!");
239                }
240            }
241        }
242
243        debug!("[sparkles] Quit from flush thread!");
244    }).unwrap()
245}
246
247/// Blocking wait for global sending thread to finish its job
248pub fn finalize() {
249    use crate as sparkles;
250    #[cfg(feature="self-tracing")]
251    sparkles_macro::instant_event!("[internal] Finalize requested");
252    
253    // Flush current thread
254    flush_thread_local();
255
256    FINALIZE_STARTED.store(true, Ordering::SeqCst);
257    SENDER_THREAD_ITERATION.notify_one();
258    let jh = if let Some(global_storage) = GLOBAL_STORAGE.lock().as_mut() {
259        global_storage.take_jh()
260    } else {
261        None
262    };
263
264    if let Some(jh) = jh {
265        debug!("[sparkles] Joining sparkles flush thread...");
266        let _ = jh.join().inspect_err(|e| {
267            error!("Error while joining sparkles' flush thread! {:?}", e);
268        });
269    }
270
271}
272
273struct TimestampFreqDetector {
274    prev_tm: u64,
275    prev_instant: Instant,
276
277    capture_interval: Duration,
278}
279
280impl TimestampFreqDetector {
281    pub fn start(interval: Duration) -> Self {
282        let now = Instant::now();
283        let now_tm = Timestamp::now();
284        Self {
285            prev_instant: now,
286            prev_tm: now_tm,
287
288            capture_interval: interval,
289        }
290    }
291    pub fn next(&mut self) -> Option<(u64, u64)> {
292        if self.prev_instant.elapsed() > self.capture_interval {
293            Some(self.next_forced())
294        }
295        else {
296            None
297        }
298    }
299
300    pub fn next_forced(&mut self) -> (u64, u64) {
301        let now = Instant::now();
302        let now_tm = Timestamp::now();
303
304        let elapsed_tm = now_tm.wrapping_sub(self.prev_tm) as f64;
305        let elapsed_ns = (now - self.prev_instant).as_nanos() as f64;
306        let ticks_per_sec = elapsed_tm / elapsed_ns * 1_000_000_000.0;
307
308        self.prev_tm = now_tm;
309        self.prev_instant = now;
310
311        (ticks_per_sec as u64, now_tm)
312    }
313}