sparkles_core/local_storage/
mod.rs

1use alloc::string::String;
2use alloc::vec::Vec;
3use core::marker::PhantomData;
4use core::sync::atomic::{AtomicUsize, Ordering};
5use crate::config::LocalStorageConfig;
6use crate::local_storage::id_mapping::{EventType, IdMappingState};
7use crate::protocol::headers::{LocalPacketHeader, ThreadInfo};
8use crate::Timestamp;
9
10use crate::timestamp::TimestampProvider;
11
12pub mod id_mapping;
13
14pub trait GlobalStorageImpl {
15    fn flush(&self, header: &LocalPacketHeader, data: &[u8]);
16    fn try_flush(&self, header: &LocalPacketHeader, data: &[u8]) -> bool;
17    fn is_buf_available(&self) -> bool;
18    fn take_new_update(&mut self) -> bool;
19}
20
21pub struct LocalStorage<G: GlobalStorageImpl> {
22    config: LocalStorageConfig,
23    
24    prev_tm: u64,
25
26    buf: Vec<u8>,
27    id_store: IdMappingState,
28
29    local_packet_header: LocalPacketHeader,
30
31    global_storage_ref: G,
32    last_range_ord_id: u8,
33    
34    started_ranges: [bool; 256],
35    started_ranges_cnt: usize,
36
37    flush_event_hash: u32,
38    flush_event_str: &'static str,
39    
40    thread_name: Option<String>,
41}
42
43static CUR_THREAD_ID: AtomicUsize = AtomicUsize::new(1);
44
45impl<G: GlobalStorageImpl> LocalStorage<G> {
46    pub fn new(global_storage_ref: G, thread_info: ThreadInfo, config: LocalStorageConfig)-> Self {
47        let thread_ord_id = CUR_THREAD_ID.fetch_add(1, Ordering::Relaxed) as u64;
48
49        let thread_name = thread_info.new_thread_name.clone();
50        let flush_event_str = "[sparkles] Flushing local storage";
51        let flush_event_hash = sparkles_macro::calc_hash!("[sprkles] Flushing local storage");
52        LocalStorage {
53            config,
54            buf: Vec::new(),
55            prev_tm: 0,
56
57            id_store: Default::default(),
58            local_packet_header: LocalPacketHeader {
59                thread_ord_id,
60                thread_info,
61
62                ..Default::default()
63            },
64
65            global_storage_ref,
66            last_range_ord_id: 0,
67            started_ranges: [false; 256],
68            started_ranges_cnt: 0,
69
70            flush_event_hash,
71            flush_event_str,
72
73            thread_name,
74        }
75    }
76
77    fn new_range_ord_id(&mut self) -> u8 {
78        let range_ord_id = self.last_range_ord_id;
79        if self.started_ranges_cnt == 256 {
80            self.last_range_ord_id = self.last_range_ord_id.wrapping_add(1);
81            range_ord_id
82        }
83        else {
84            self.last_range_ord_id = self.last_range_ord_id.wrapping_add(1);
85            while self.started_ranges[self.last_range_ord_id as usize] {
86                self.last_range_ord_id = self.last_range_ord_id.wrapping_add(1);
87            }
88            
89            self.started_ranges[range_ord_id as usize] = true;
90            self.started_ranges_cnt += 1;
91            range_ord_id
92        }
93    }
94
95    #[inline(always)]
96    pub fn event_range_start(&mut self, hash: u32, name: &str) -> RangeStartRepr {
97        self.event_range_start_inner(hash, name, false)
98    }
99
100    fn event_range_start_inner(&mut self, hash: u32, name: &str, prevent_flushing: bool) -> RangeStartRepr {
101        // On a new range event we acquire new range_ord_id to match start and end events
102        let range_ord_id = self.new_range_ord_id();
103        let start_id = self.id_store.insert_and_get_id(hash, name, EventType::RangeStart);
104        self.range_event(Some(start_id), range_ord_id, prevent_flushing);
105
106        RangeStartRepr {
107            range_ord_id,
108            range_start_id: start_id,
109
110            _not_send: PhantomData
111        }
112    }
113
114    #[inline(always)]
115    pub fn event_range_end(&mut self, range_start: RangeStartRepr, hash: u32, name: &str) {
116        self.event_range_end_inner(range_start, hash, name, false);
117    }
118
119    #[inline(always)]
120    fn event_range_end_inner(&mut self, range_start: RangeStartRepr, hash: u32, name: &str, prevent_flushing: bool) {
121        let range_ord_id = range_start.range_ord_id;
122        self.started_ranges[range_start.range_ord_id as usize] = false;
123        self.started_ranges_cnt -= 1;
124        let start_id = range_start.range_start_id;
125        if hash != 0 {
126            let end_id = self.id_store.insert_and_get_id(hash, name, EventType::RangeEnd(start_id));
127            self.range_event(Some(end_id), range_ord_id, prevent_flushing);
128        }
129        else {
130            self.range_event(None, range_ord_id, prevent_flushing);
131        }
132    }
133
134    #[inline(always)]
135    fn range_event(&mut self, id: Option<u8>, range_ord_id: u8, prevent_flushing: bool) {
136        //      STAGE 2: Acquire timestamp and calculate now, dif_tm
137        //    (3ns on non-serializing x86 timestamp, 11ns on serializing x86 timestamp)
138        let timestamp = Timestamp::now();
139
140        //      STAGE 3: Update local info
141        let dif_tm = self.update_local_info(timestamp);
142
143        //      STAGE 4: PUSH VALUES
144        let dif_tm_bytes: [u8; 8] = dif_tm.to_le_bytes();
145        let dif_tm_bytes_len = ((Timestamp::TIMESTAMP_VALID_BITS as u32 + 7 - dif_tm.leading_zeros()) >> 3) as u8;
146        let buf = match id {
147            Some(id) => [id, dif_tm_bytes_len | 0x80, range_ord_id],
148            None => [0, dif_tm_bytes_len | 0xC0, range_ord_id]
149        };
150        self.buf.extend_from_slice(&buf);
151        self.buf.extend_from_slice(&dif_tm_bytes[..dif_tm_bytes_len as usize]);
152
153
154        //      STAGE 5: flushing
155        if !prevent_flushing {
156            self.auto_flush();
157        }
158    }
159
160
161    #[inline(always)]
162    pub fn event_instant(&mut self, hash: u32, string: &str) {
163        //      STAGE 1: insert string and get ID.
164        let id = self.id_store.insert_and_get_id(hash, string, EventType::Instant);
165        self.event(id);
166    }
167
168    #[inline(always)]
169    fn event(&mut self, id: u8) {
170        //      STAGE 2: Acquire timestamp and calculate now, dif_tm
171        //    (3ns on non-serializing x86 timestamp, 11ns on serializing x86 timestamp)
172        let timestamp = Timestamp::now();
173
174        //      STAGE 3: Update local info
175        let dif_tm = self.update_local_info(timestamp);
176
177        //      STAGE 4: PUSH VALUES
178        let dif_tm_bytes: [u8; 8] = dif_tm.to_le_bytes();
179        let dif_tm_bytes_len = ((Timestamp::TIMESTAMP_VALID_BITS as u32 + 7 - dif_tm.leading_zeros()) >> 3) as u8;
180        let buf = [id, dif_tm_bytes_len];
181        self.buf.extend_from_slice(&buf);
182        self.buf.extend_from_slice(&dif_tm_bytes[..dif_tm_bytes_len as usize]);
183
184
185        //      STAGE 5: flushing
186        self.auto_flush();
187    }
188
189    #[inline(always)]
190    fn update_local_info(&mut self, timestamp: u64) -> u64 {
191        let mut dif_tm = timestamp.wrapping_sub(self.prev_tm);
192        self.prev_tm = timestamp;
193        if self.local_packet_header.start_timestamp == 0 {
194            self.local_packet_header.start_timestamp = timestamp;
195            dif_tm = 0;
196        }
197        dif_tm
198    }
199
200    pub fn set_cur_thread_name(&mut self, name: String) {
201        self.thread_name = Some(name);
202        self.local_packet_header.thread_info.new_thread_name = self.thread_name.clone();
203    }
204
205    /// Check buffer length, and flush if the buffer is full
206    #[inline(always)]
207    pub fn auto_flush(&mut self) {
208        if self.buf.len() >= self.config.flush_threshold {
209            self.flush(true);
210        }
211        else if self.buf.len() >= self.config.flush_attempt_threshold && self.global_storage_ref.is_buf_available() {
212            self.flush(false);
213        }
214    }
215
216    /// Flush whole event buffer data to the global storage
217    pub fn flush(&mut self, blocking: bool) {
218        if self.buf.is_empty() {
219            // Nothing to flush, ignore
220            return;
221        }
222
223        #[cfg(feature = "self-tracing")]
224        let range_event = self.event_range_start_inner(self.flush_event_hash, self.flush_event_str, true);
225        let new_update = self.global_storage_ref.take_new_update();
226        if new_update {
227            self.local_packet_header.thread_info.new_thread_name = self.thread_name.clone();
228        }
229
230        // Fill header
231        self.local_packet_header.end_timestamp = self.prev_tm;
232        self.local_packet_header.id_store = self.id_store.clone().into();
233
234        let success = if blocking {
235            self.global_storage_ref.flush(&self.local_packet_header, &self.buf);
236            true
237        }
238        else {
239            self.global_storage_ref.try_flush(&self.local_packet_header, &self.buf)
240        };
241
242        //cleanup
243        if success {
244            self.buf.clear();
245            if self.local_packet_header.thread_info.new_thread_name.is_some() {
246                self.local_packet_header.thread_info.new_thread_name = None;
247            }
248            self.local_packet_header.start_timestamp = 0;
249        }
250        #[cfg(feature = "self-tracing")]
251        self.event_range_end_inner(range_event, 0, "", true);
252    }
253}
254
255impl<G: GlobalStorageImpl> Drop for LocalStorage<G> {
256    fn drop(&mut self) {
257        self.flush(true);
258    }
259}
260
261#[derive(Copy, Clone)]
262pub struct RangeStartRepr {
263    range_start_id: u8, // required to create potentially new end event
264    range_ord_id: u8, // required to match with start event during parsing
265
266    _not_send: PhantomData<*const ()>
267}