sparkles/
global_storage.rs1use std::io::Read;
5use std::{mem, thread};
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::thread::{JoinHandle};
8use std::time::{Duration, Instant};
9use log::{debug, error, trace, warn};
10use parking_lot::{Condvar, Mutex};
11use ringbuf::traits::{Consumer, Observer, Producer};
12use sparkles_core::{Timestamp, TimestampProvider};
13use sparkles_core::protocol::headers::{LocalPacketHeader, SparklesMachineInfo};
14use sparkles_core::protocol::packets::{send_failed_pages, send_graceful_shutdown, send_machine_info, send_timestamp_freq, send_trace_data};
15use sparkles_core::protocol::sender::{ConfiguredSender, Sender, SenderChain};
16use crate::config::SparklesConfig;
17use crate::{flush_thread_local, on_client_connect, GLOBAL_FLUSHING_RUNNING, THREAD_LOCAL_NOTIFICATION};
18use crate::sender::file_sender::FileSender;
19use crate::thread_local_storage::set_local_storage_config;
20
21pub static GLOBAL_STORAGE: Mutex<Option<GlobalStorage>> = Mutex::new(None);
22static FINALIZE_STARTED: AtomicBool = AtomicBool::new(false);
23static SENDER_THREAD_ITERATION: Condvar = Condvar::new();
24
25pub struct GlobalStorage {
26 config: SparklesConfig,
27 inner: ringbuf::LocalRb<ringbuf::storage::Heap<u8>>,
28 sending_thread: Option<JoinHandle<()>>,
29
30 skipped_msr_pages_headers: Vec<LocalPacketHeader>,
31}
32
33impl GlobalStorage {
34 pub fn new(config: SparklesConfig) -> Self {
36 set_local_storage_config(config.local_storage_config);
38
39 let jh = spawn_sending_task(config.clone());
40
41 let global_capacity = config.global_capacity;
42 Self {
43 config,
44 inner: ringbuf::LocalRb::new(global_capacity),
45 sending_thread: Some(jh),
46
47 skipped_msr_pages_headers: Vec::new(),
48 }
49 }
50
51
52 pub fn push_buf(&mut self, header: &LocalPacketHeader, buf: &[u8]) {
54 let header = bincode::encode_to_vec(header, bincode::config::standard()).unwrap();
56 let header_len = (header.len() as u64).to_be_bytes();
57 let bufer_len = (buf.len() as u64).to_be_bytes();
58
59 self.inner.push_slice(&header_len);
60 self.inner.push_slice(&header);
61 self.inner.push_slice(&bufer_len);
62 self.inner.push_slice(buf);
63
64 if self.inner.occupied_len() > (self.config.cleanup_threshold * self.config.global_capacity as f64) as usize {
65 warn!("[sparkles] BUFFER FULL! starting cleanup..");
66 let mut header_len = [0u8; 8];
67 let mut buf_len = [0u8; 8];
68 let mut header_bytes = Vec::new();
69 while self.inner.occupied_len() > (self.config.cleanup_bottom_threshold * self.config.global_capacity as f64) as usize {
70 self.inner.read_exact(&mut header_len).unwrap();
71 let header_len = u64::from_be_bytes(header_len) as usize;
72
73 header_bytes.resize(header_len, 0);
74 self.inner.read_exact(&mut header_bytes).unwrap();
75 let (header, _) = bincode::decode_from_slice(&header_bytes, bincode::config::standard()).unwrap();
76
77 self.inner.read_exact(&mut buf_len).unwrap();
78 let buf_len = u64::from_be_bytes(buf_len) as usize;
79 self.inner.skip(buf_len);
80 self.skipped_msr_pages_headers.push(header);
81 }
82 }
83 }
84
85 fn take_failed_pages(&mut self) -> Vec<LocalPacketHeader> {
86 mem::take(&mut self.skipped_msr_pages_headers)
87 }
88
89 fn try_take_buf(&mut self, take_everything: bool) -> Option<(Vec<u8>, Vec<u8>)> {
90 let threshold = if take_everything {
91 0
92 } else {
93 self.config.flush_threshold
94 };
95 if self.inner.occupied_len() > threshold {
96 use crate as sparkles;
97 #[cfg(feature="self-tracing")]
98 let g = sparkles_macro::range_event_start!("[internal] Taking stored events");
99
100 debug!("[sparkles] Flushing..");
101 let slices = self.inner.as_slices();
102 let slices = (slices.0.to_vec(), slices.1.to_vec());
103 self.inner.clear();
104 Some(slices)
105 }
106 else {
107 None
108 }
109 }
110
111 fn take_jh(&mut self) -> Option<JoinHandle<()>> {
112 self.sending_thread.take()
113 }
114
115 pub fn check_notify(&self) {
116 let thr = self.config.flush_threshold;
117 if self.inner.occupied_len() > thr {
118 SENDER_THREAD_ITERATION.notify_one();
119 }
120 }
121}
122
123fn spawn_sending_task(config: SparklesConfig) -> JoinHandle<()> {
124 thread::Builder::new().name("[Sparkles] Sender thread".to_string()).spawn(move || {
125 debug!("[sparkles] Flush thread started!");
126
127 let mut sender_chain = SenderChain::default();
128 if let Some(file_sender_config) = config.file_sender_config.as_ref() {
129 if let Some(sender) = FileSender::new(file_sender_config) {
130 sender_chain.with_sender(sender);
131 }
132 else {
133 warn!("[sparkles] Failed to create file sender!");
134 }
135 }
136 #[cfg(feature = "udp-streaming")]
137 if let Some(udp_sender_config) = config.udp_sender_config.as_ref() {
138 if let Some(sender) = crate::sender::udp_sender::UdpSender::new(udp_sender_config) {
139 sender_chain.with_sender(sender);
140 }
141 else {
142 warn!("[sparkles] Failed to create UDP sender!");
143 on_client_connect();
144 }
145 }
146 else {
147 on_client_connect();
148 }
149 #[cfg(not(feature = "udp-streaming"))]
150 on_client_connect();
151
152 let process_name = std::env::current_exe().unwrap().file_name().unwrap().to_str().unwrap().to_string();
153 let pid = std::process::id();
154
155 let mut freq_detector = TimestampFreqDetector::start(Duration::from_millis(100));
156
157 let info_header = SparklesMachineInfo::new(process_name, pid);
158 send_machine_info(&mut sender_chain, info_header.clone());
159
160 thread::sleep(Duration::from_millis(1));
161
162 let (ticks_per_sec, cur_tm) = freq_detector.next_forced();
163 send_timestamp_freq(&mut sender_chain, ticks_per_sec, cur_tm);
164
165 let mut last_sender_poll_tm: Option<Instant> = None;
166
167 let tmp_mutex = Mutex::new(());
168 loop {
169 use crate as sparkles;
170
171 if last_sender_poll_tm.is_none_or(|tm| tm.elapsed() > Duration::from_millis(200)) {
172 sender_chain.poll();
173 last_sender_poll_tm = Some(Instant::now());
174 }
175
176 if sender_chain.take_tm_freq_requested() {
177 THREAD_LOCAL_NOTIFICATION.fetch_add(1, Ordering::Relaxed);
178
179 let (ticks_per_sec, cur_tm) = freq_detector.next_forced();
180 send_timestamp_freq(&mut sender_chain, ticks_per_sec, cur_tm);
181 send_machine_info(&mut sender_chain, info_header.clone());
182 }
183 else if let Some((ticks_per_sec, cur_tm)) = freq_detector.next() {
184 send_timestamp_freq(&mut sender_chain, ticks_per_sec, cur_tm);
185 }
186
187 let is_finalizing = FINALIZE_STARTED.load(Ordering::Relaxed);
189 if is_finalizing {
190 debug!("[sparkles] Finalize detected!");
191 }
192
193 let (slices, failed_pages) = {
195 #[cfg(feature="self-tracing")]
196 if is_finalizing {
197 sparkles_macro::instant_event!("[internal] Finalizing");
198 flush_thread_local();
199 }
200
201 if let Some(global_storage) = GLOBAL_STORAGE.lock().as_mut() {
202 let failed_pages = global_storage.take_failed_pages();
203
204 GLOBAL_FLUSHING_RUNNING.store(true, Ordering::Relaxed);
205 (global_storage.try_take_buf(is_finalizing), failed_pages)
206 }
207 else {
208 (None, Vec::new())
209 }
210 };
211 GLOBAL_FLUSHING_RUNNING.store(false, Ordering::Relaxed);
212
213 if let Some((slice1, slice2)) = slices {
215 #[cfg(feature="self-tracing")]
216 let grd = crate::range_event_start(crate::calculate_hash("[internal] Send data bytes"), "[internal] Send data bytes");
217 send_trace_data(&mut sender_chain, &slice1, &slice2);
218 }
219
220 if !failed_pages.is_empty() {
222 trace!("Sending {} failed pages", failed_pages.len());
223 send_failed_pages(&mut sender_chain, &failed_pages)
224 }
225
226 if is_finalizing {
227 debug!("[internal] Finalize in process...");
228 send_graceful_shutdown(&mut sender_chain);
229 break;
230 }
231
232 if !FINALIZE_STARTED.load(Ordering::Relaxed) {
233 let mut mutex = tmp_mutex.lock();
234 #[cfg(feature="self-tracing")]
235 let g = sparkles_macro::range_event_start!("[internal] Waiting for signal");
236 if SENDER_THREAD_ITERATION.wait_for(&mut mutex, Duration::from_millis(50)).timed_out() {
237 #[cfg(feature="self-tracing")]
238 sparkles_macro::range_event_end!(g, "Timeout!");
239 }
240 }
241 }
242
243 debug!("[sparkles] Quit from flush thread!");
244 }).unwrap()
245}
246
247pub fn finalize() {
249 use crate as sparkles;
250 #[cfg(feature="self-tracing")]
251 sparkles_macro::instant_event!("[internal] Finalize requested");
252
253 flush_thread_local();
255
256 FINALIZE_STARTED.store(true, Ordering::SeqCst);
257 SENDER_THREAD_ITERATION.notify_one();
258 let jh = if let Some(global_storage) = GLOBAL_STORAGE.lock().as_mut() {
259 global_storage.take_jh()
260 } else {
261 None
262 };
263
264 if let Some(jh) = jh {
265 debug!("[sparkles] Joining sparkles flush thread...");
266 let _ = jh.join().inspect_err(|e| {
267 error!("Error while joining sparkles' flush thread! {:?}", e);
268 });
269 }
270
271}
272
273struct TimestampFreqDetector {
274 prev_tm: u64,
275 prev_instant: Instant,
276
277 capture_interval: Duration,
278}
279
280impl TimestampFreqDetector {
281 pub fn start(interval: Duration) -> Self {
282 let now = Instant::now();
283 let now_tm = Timestamp::now();
284 Self {
285 prev_instant: now,
286 prev_tm: now_tm,
287
288 capture_interval: interval,
289 }
290 }
291 pub fn next(&mut self) -> Option<(u64, u64)> {
292 if self.prev_instant.elapsed() > self.capture_interval {
293 Some(self.next_forced())
294 }
295 else {
296 None
297 }
298 }
299
300 pub fn next_forced(&mut self) -> (u64, u64) {
301 let now = Instant::now();
302 let now_tm = Timestamp::now();
303
304 let elapsed_tm = now_tm.wrapping_sub(self.prev_tm) as f64;
305 let elapsed_ns = (now - self.prev_instant).as_nanos() as f64;
306 let ticks_per_sec = elapsed_tm / elapsed_ns * 1_000_000_000.0;
307
308 self.prev_tm = now_tm;
309 self.prev_instant = now;
310
311 (ticks_per_sec as u64, now_tm)
312 }
313}