sparkles_core/local_storage/
mod.rs1use alloc::string::String;
2use alloc::vec::Vec;
3use core::marker::PhantomData;
4use core::sync::atomic::{AtomicUsize, Ordering};
5use crate::config::LocalStorageConfig;
6use crate::local_storage::id_mapping::{EventType, IdMappingState};
7use crate::protocol::headers::{LocalPacketHeader, ThreadInfo};
8use crate::Timestamp;
9
10use crate::timestamp::TimestampProvider;
11
12pub mod id_mapping;
13
14pub trait GlobalStorageImpl {
15 fn flush(&self, header: &LocalPacketHeader, data: &[u8]);
16 fn try_flush(&self, header: &LocalPacketHeader, data: &[u8]) -> bool;
17 fn is_buf_available(&self) -> bool;
18 fn take_new_update(&mut self) -> bool;
19}
20
21pub struct LocalStorage<G: GlobalStorageImpl> {
22 config: LocalStorageConfig,
23
24 prev_tm: u64,
25
26 buf: Vec<u8>,
27 id_store: IdMappingState,
28
29 local_packet_header: LocalPacketHeader,
30
31 global_storage_ref: G,
32 last_range_ord_id: u8,
33
34 started_ranges: [bool; 256],
35 started_ranges_cnt: usize,
36
37 flush_event_hash: u32,
38 flush_event_str: &'static str,
39
40 thread_name: Option<String>,
41}
42
43static CUR_THREAD_ID: AtomicUsize = AtomicUsize::new(1);
44
45impl<G: GlobalStorageImpl> LocalStorage<G> {
46 pub fn new(global_storage_ref: G, thread_info: ThreadInfo, config: LocalStorageConfig)-> Self {
47 let thread_ord_id = CUR_THREAD_ID.fetch_add(1, Ordering::Relaxed) as u64;
48
49 let thread_name = thread_info.new_thread_name.clone();
50 let flush_event_str = "[sparkles] Flushing local storage";
51 let flush_event_hash = sparkles_macro::calc_hash!("[sprkles] Flushing local storage");
52 LocalStorage {
53 config,
54 buf: Vec::new(),
55 prev_tm: 0,
56
57 id_store: Default::default(),
58 local_packet_header: LocalPacketHeader {
59 thread_ord_id,
60 thread_info,
61
62 ..Default::default()
63 },
64
65 global_storage_ref,
66 last_range_ord_id: 0,
67 started_ranges: [false; 256],
68 started_ranges_cnt: 0,
69
70 flush_event_hash,
71 flush_event_str,
72
73 thread_name,
74 }
75 }
76
77 fn new_range_ord_id(&mut self) -> u8 {
78 let range_ord_id = self.last_range_ord_id;
79 if self.started_ranges_cnt == 256 {
80 self.last_range_ord_id = self.last_range_ord_id.wrapping_add(1);
81 range_ord_id
82 }
83 else {
84 self.last_range_ord_id = self.last_range_ord_id.wrapping_add(1);
85 while self.started_ranges[self.last_range_ord_id as usize] {
86 self.last_range_ord_id = self.last_range_ord_id.wrapping_add(1);
87 }
88
89 self.started_ranges[range_ord_id as usize] = true;
90 self.started_ranges_cnt += 1;
91 range_ord_id
92 }
93 }
94
95 #[inline(always)]
96 pub fn event_range_start(&mut self, hash: u32, name: &str) -> RangeStartRepr {
97 self.event_range_start_inner(hash, name, false)
98 }
99
100 fn event_range_start_inner(&mut self, hash: u32, name: &str, prevent_flushing: bool) -> RangeStartRepr {
101 let range_ord_id = self.new_range_ord_id();
103 let start_id = self.id_store.insert_and_get_id(hash, name, EventType::RangeStart);
104 self.range_event(Some(start_id), range_ord_id, prevent_flushing);
105
106 RangeStartRepr {
107 range_ord_id,
108 range_start_id: start_id,
109
110 _not_send: PhantomData
111 }
112 }
113
114 #[inline(always)]
115 pub fn event_range_end(&mut self, range_start: RangeStartRepr, hash: u32, name: &str) {
116 self.event_range_end_inner(range_start, hash, name, false);
117 }
118
119 #[inline(always)]
120 fn event_range_end_inner(&mut self, range_start: RangeStartRepr, hash: u32, name: &str, prevent_flushing: bool) {
121 let range_ord_id = range_start.range_ord_id;
122 self.started_ranges[range_start.range_ord_id as usize] = false;
123 self.started_ranges_cnt -= 1;
124 let start_id = range_start.range_start_id;
125 if hash != 0 {
126 let end_id = self.id_store.insert_and_get_id(hash, name, EventType::RangeEnd(start_id));
127 self.range_event(Some(end_id), range_ord_id, prevent_flushing);
128 }
129 else {
130 self.range_event(None, range_ord_id, prevent_flushing);
131 }
132 }
133
134 #[inline(always)]
135 fn range_event(&mut self, id: Option<u8>, range_ord_id: u8, prevent_flushing: bool) {
136 let timestamp = Timestamp::now();
139
140 let dif_tm = self.update_local_info(timestamp);
142
143 let dif_tm_bytes: [u8; 8] = dif_tm.to_le_bytes();
145 let dif_tm_bytes_len = ((Timestamp::TIMESTAMP_VALID_BITS as u32 + 7 - dif_tm.leading_zeros()) >> 3) as u8;
146 let buf = match id {
147 Some(id) => [id, dif_tm_bytes_len | 0x80, range_ord_id],
148 None => [0, dif_tm_bytes_len | 0xC0, range_ord_id]
149 };
150 self.buf.extend_from_slice(&buf);
151 self.buf.extend_from_slice(&dif_tm_bytes[..dif_tm_bytes_len as usize]);
152
153
154 if !prevent_flushing {
156 self.auto_flush();
157 }
158 }
159
160
161 #[inline(always)]
162 pub fn event_instant(&mut self, hash: u32, string: &str) {
163 let id = self.id_store.insert_and_get_id(hash, string, EventType::Instant);
165 self.event(id);
166 }
167
168 #[inline(always)]
169 fn event(&mut self, id: u8) {
170 let timestamp = Timestamp::now();
173
174 let dif_tm = self.update_local_info(timestamp);
176
177 let dif_tm_bytes: [u8; 8] = dif_tm.to_le_bytes();
179 let dif_tm_bytes_len = ((Timestamp::TIMESTAMP_VALID_BITS as u32 + 7 - dif_tm.leading_zeros()) >> 3) as u8;
180 let buf = [id, dif_tm_bytes_len];
181 self.buf.extend_from_slice(&buf);
182 self.buf.extend_from_slice(&dif_tm_bytes[..dif_tm_bytes_len as usize]);
183
184
185 self.auto_flush();
187 }
188
189 #[inline(always)]
190 fn update_local_info(&mut self, timestamp: u64) -> u64 {
191 let mut dif_tm = timestamp.wrapping_sub(self.prev_tm);
192 self.prev_tm = timestamp;
193 if self.local_packet_header.start_timestamp == 0 {
194 self.local_packet_header.start_timestamp = timestamp;
195 dif_tm = 0;
196 }
197 dif_tm
198 }
199
200 pub fn set_cur_thread_name(&mut self, name: String) {
201 self.thread_name = Some(name);
202 self.local_packet_header.thread_info.new_thread_name = self.thread_name.clone();
203 }
204
205 #[inline(always)]
207 pub fn auto_flush(&mut self) {
208 if self.buf.len() >= self.config.flush_threshold {
209 self.flush(true);
210 }
211 else if self.buf.len() >= self.config.flush_attempt_threshold && self.global_storage_ref.is_buf_available() {
212 self.flush(false);
213 }
214 }
215
216 pub fn flush(&mut self, blocking: bool) {
218 if self.buf.is_empty() {
219 return;
221 }
222
223 #[cfg(feature = "self-tracing")]
224 let range_event = self.event_range_start_inner(self.flush_event_hash, self.flush_event_str, true);
225 let new_update = self.global_storage_ref.take_new_update();
226 if new_update {
227 self.local_packet_header.thread_info.new_thread_name = self.thread_name.clone();
228 }
229
230 self.local_packet_header.end_timestamp = self.prev_tm;
232 self.local_packet_header.id_store = self.id_store.clone().into();
233
234 let success = if blocking {
235 self.global_storage_ref.flush(&self.local_packet_header, &self.buf);
236 true
237 }
238 else {
239 self.global_storage_ref.try_flush(&self.local_packet_header, &self.buf)
240 };
241
242 if success {
244 self.buf.clear();
245 if self.local_packet_header.thread_info.new_thread_name.is_some() {
246 self.local_packet_header.thread_info.new_thread_name = None;
247 }
248 self.local_packet_header.start_timestamp = 0;
249 }
250 #[cfg(feature = "self-tracing")]
251 self.event_range_end_inner(range_event, 0, "", true);
252 }
253}
254
255impl<G: GlobalStorageImpl> Drop for LocalStorage<G> {
256 fn drop(&mut self) {
257 self.flush(true);
258 }
259}
260
261#[derive(Copy, Clone)]
262pub struct RangeStartRepr {
263 range_start_id: u8, range_ord_id: u8, _not_send: PhantomData<*const ()>
267}