1use std::collections::HashMap;
80use std::fs::OpenOptions;
81use std::io::{self, BufWriter, Write};
82use std::path::PathBuf;
83use std::sync::atomic::{AtomicU64, Ordering};
84use std::sync::{Arc, OnceLock, RwLock};
85use std::time::{SystemTime, UNIX_EPOCH};
86use crossbeam_channel::{bounded, Receiver, Sender, TryRecvError};
87
88static SEQUENCE_COUNTER: AtomicU64 = AtomicU64::new(0);
90
91static GLOBAL_LOGGER: OnceLock<LightningLogger> = OnceLock::new();
93
94static MESSAGE_FORMATS: OnceLock<Arc<RwLock<HashMap<u32, String>>>> = OnceLock::new();
96
97pub trait FastSerialize {
99 fn write_to_buffer(&self, buffer: &mut Vec<u8>);
101
102 fn size_hint(&self) -> usize { 8 }
104}
105
106impl FastSerialize for i8 {
107 fn write_to_buffer(&self, buffer: &mut Vec<u8>) {
108 buffer.push(*self as u8);
109 }
110 fn size_hint(&self) -> usize { 1 }
111}
112
113impl FastSerialize for i16 {
114 fn write_to_buffer(&self, buffer: &mut Vec<u8>) {
115 buffer.extend_from_slice(&self.to_le_bytes());
116 }
117 fn size_hint(&self) -> usize { 2 }
118}
119
120impl FastSerialize for i32 {
121 fn write_to_buffer(&self, buffer: &mut Vec<u8>) {
122 buffer.extend_from_slice(&self.to_le_bytes());
123 }
124 fn size_hint(&self) -> usize { 4 }
125}
126
127impl FastSerialize for i64 {
128 fn write_to_buffer(&self, buffer: &mut Vec<u8>) {
129 buffer.extend_from_slice(&self.to_le_bytes());
130 }
131 fn size_hint(&self) -> usize { 8 }
132}
133
134impl FastSerialize for u8 {
135 fn write_to_buffer(&self, buffer: &mut Vec<u8>) {
136 buffer.push(*self);
137 }
138 fn size_hint(&self) -> usize { 1 }
139}
140
141impl FastSerialize for u16 {
142 fn write_to_buffer(&self, buffer: &mut Vec<u8>) {
143 buffer.extend_from_slice(&self.to_le_bytes());
144 }
145 fn size_hint(&self) -> usize { 2 }
146}
147
148impl FastSerialize for u32 {
149 fn write_to_buffer(&self, buffer: &mut Vec<u8>) {
150 buffer.extend_from_slice(&self.to_le_bytes());
151 }
152 fn size_hint(&self) -> usize { 4 }
153}
154
155impl FastSerialize for u64 {
156 fn write_to_buffer(&self, buffer: &mut Vec<u8>) {
157 buffer.extend_from_slice(&self.to_le_bytes());
158 }
159 fn size_hint(&self) -> usize { 8 }
160}
161
162impl FastSerialize for f32 {
163 fn write_to_buffer(&self, buffer: &mut Vec<u8>) {
164 buffer.extend_from_slice(&self.to_le_bytes());
165 }
166 fn size_hint(&self) -> usize { 4 }
167}
168
169impl FastSerialize for f64 {
170 fn write_to_buffer(&self, buffer: &mut Vec<u8>) {
171 buffer.extend_from_slice(&self.to_le_bytes());
172 }
173 fn size_hint(&self) -> usize { 8 }
174}
175
176impl FastSerialize for bool {
177 fn write_to_buffer(&self, buffer: &mut Vec<u8>) {
178 buffer.push(*self as u8);
179 }
180 fn size_hint(&self) -> usize { 1 }
181}
182
183impl FastSerialize for &str {
184 fn write_to_buffer(&self, buffer: &mut Vec<u8>) {
185 let len = self.len() as u32;
186 buffer.extend_from_slice(&len.to_le_bytes());
187 buffer.extend_from_slice(self.as_bytes());
188 }
189 fn size_hint(&self) -> usize { 4 + self.len() }
190}
191
192impl FastSerialize for &&str {
193 fn write_to_buffer(&self, buffer: &mut Vec<u8>) {
194 let len = self.len() as u32;
195 buffer.extend_from_slice(&len.to_le_bytes());
196 buffer.extend_from_slice(self.as_bytes());
197 }
198 fn size_hint(&self) -> usize { 4 + self.len() }
199}
200
201impl FastSerialize for String {
202 fn write_to_buffer(&self, buffer: &mut Vec<u8>) {
203 self.as_str().write_to_buffer(buffer);
204 }
205 fn size_hint(&self) -> usize { 4 + self.len() }
206}
207
208impl FastSerialize for &String {
209 fn write_to_buffer(&self, buffer: &mut Vec<u8>) {
210 self.as_str().write_to_buffer(buffer);
211 }
212 fn size_hint(&self) -> usize { 4 + self.len() }
213}
214
215impl FastSerialize for &[u8] {
216 fn write_to_buffer(&self, buffer: &mut Vec<u8>) {
217 let len = self.len() as u32;
218 buffer.extend_from_slice(&len.to_le_bytes());
219 buffer.extend_from_slice(self);
220 }
221 fn size_hint(&self) -> usize { 4 + self.len() }
222}
223
224#[derive(Debug, Clone)]
226pub enum LogDestination {
227 Stdout,
229 Stderr,
231 File(PathBuf),
233 Multiple(Vec<LogDestination>),
235}
236
237#[derive(Debug, Clone)]
239pub struct LoggerConfig {
240 pub channel_capacity: usize,
242 pub destinations: Vec<LogDestination>,
244 pub file_buffer_size: usize,
246 pub enable_cpu_affinity: bool,
248}
249
250impl Default for LoggerConfig {
251 fn default() -> Self {
252 Self {
253 channel_capacity: 10000,
254 destinations: vec![LogDestination::Stdout],
255 file_buffer_size: 8192,
256 enable_cpu_affinity: true,
257 }
258 }
259}
260
261#[repr(u8)]
263#[derive(Debug, Clone, Copy, PartialEq)]
264pub enum LogLevel {
265 Debug = 0,
266 Info = 1,
267 Warn = 2,
268 Error = 3,
269}
270
271impl std::fmt::Display for LogLevel {
272 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
273 match self {
274 LogLevel::Debug => write!(f, "DEBUG"),
275 LogLevel::Info => write!(f, "INFO"),
276 LogLevel::Warn => write!(f, "WARN"),
277 LogLevel::Error => write!(f, "ERROR"),
278 }
279 }
280}
281
282#[derive(Debug)]
284pub struct LogEntry {
285 pub sequence: u64,
286 pub timestamp_nanos: u64,
287 pub level: LogLevel,
288 pub message_id: u32,
289 pub data: Vec<u8>,
290}
291
292impl LogEntry {
293 pub fn new(level: LogLevel, message_id: u32, estimated_size: usize) -> Self {
295 let sequence = SEQUENCE_COUNTER.fetch_add(1, Ordering::Relaxed);
296 let timestamp_nanos = SystemTime::now()
297 .duration_since(UNIX_EPOCH)
298 .unwrap_or_default()
299 .as_nanos() as u64;
300
301 Self {
302 sequence,
303 timestamp_nanos,
304 level,
305 message_id,
306 data: Vec::with_capacity(estimated_size + 32), }
308 }
309}
310
311pub struct OutputWriter {
313 writers: Vec<Box<dyn Write + Send + 'static>>,
314}
315
316impl OutputWriter {
317 pub fn new(destinations: &[LogDestination], buffer_size: usize) -> io::Result<Self> {
319 let mut writers = Vec::new();
320 Self::collect_writers(destinations, buffer_size, &mut writers)?;
321 Ok(Self { writers })
322 }
323
324 fn collect_writers(destinations: &[LogDestination], buffer_size: usize, writers: &mut Vec<Box<dyn Write + Send + 'static>>) -> io::Result<()> {
326 for dest in destinations {
327 match dest {
328 LogDestination::Stdout => {
329 writers.push(Box::new(BufWriter::with_capacity(buffer_size, io::stdout())) as Box<dyn Write + Send>);
330 }
331 LogDestination::Stderr => {
332 writers.push(Box::new(BufWriter::with_capacity(buffer_size, io::stderr())) as Box<dyn Write + Send>);
333 }
334 LogDestination::File(path) => {
335 let file = OpenOptions::new()
336 .create(true)
337 .append(true)
338 .open(path)?;
339 writers.push(Box::new(BufWriter::with_capacity(buffer_size, file)) as Box<dyn Write + Send>);
340 }
341 LogDestination::Multiple(dests) => {
342 Self::collect_writers(dests, buffer_size, writers)?;
344 }
345 }
346 }
347 Ok(())
348 }
349
350 pub fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
352 for writer in &mut self.writers {
353 writer.write_all(data)?;
354 }
355 Ok(())
356 }
357
358 pub fn flush(&mut self) -> io::Result<()> {
360 for writer in &mut self.writers {
361 writer.flush()?;
362 }
363 Ok(())
364 }
365}
366
367impl Drop for OutputWriter {
368 fn drop(&mut self) {
369 let _ = self.flush();
370 }
371}
372
373pub struct LightningLogger {
375 sender: Sender<LogEntry>,
376 _handle: std::thread::JoinHandle<()>,
377 shutdown: std::sync::Arc<std::sync::atomic::AtomicBool>,
378}
379
380impl LightningLogger {
381 pub fn new() -> Self {
383 Self::with_config(LoggerConfig::default())
384 }
385
386 pub fn with_capacity(capacity: usize) -> Self {
388 let mut config = LoggerConfig::default();
389 config.channel_capacity = capacity;
390 Self::with_config(config)
391 }
392
393 pub fn with_config(config: LoggerConfig) -> Self {
395 let (sender, receiver) = bounded(config.channel_capacity);
396 let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
397
398 let config_clone = config.clone();
399 let shutdown_clone = shutdown.clone();
400 let handle = std::thread::spawn(move || {
401 Self::logging_thread(receiver, config_clone, shutdown_clone);
402 });
403
404 Self {
405 sender,
406 _handle: handle,
407 shutdown,
408 }
409 }
410
411 fn logging_thread(receiver: Receiver<LogEntry>, config: LoggerConfig, shutdown: std::sync::Arc<std::sync::atomic::AtomicBool>) {
413 #[cfg(feature = "cpu-affinity")]
415 if config.enable_cpu_affinity {
416 if let Some(core_ids) = core_affinity::get_core_ids() {
417 if let Some(&last_core) = core_ids.last() {
418 let _ = core_affinity::set_for_current(last_core);
419 }
420 }
421 }
422
423 let mut output_writer = match OutputWriter::new(&config.destinations, config.file_buffer_size) {
425 Ok(writer) => writer,
426 Err(e) => {
427 eprintln!("Failed to initialize output writer: {}", e);
428 return;
429 }
430 };
431
432 loop {
433 match receiver.try_recv() {
434 Ok(entry) => {
435 Self::process_entry(entry, &mut output_writer);
436 },
437 Err(TryRecvError::Empty) => {
438 if shutdown.load(std::sync::atomic::Ordering::Relaxed) {
440 let mut remaining = Vec::new();
442 while let Ok(entry) = receiver.try_recv() {
443 remaining.push(entry);
444 }
445
446 for entry in remaining {
447 Self::process_entry(entry, &mut output_writer);
448 }
449
450 let _ = output_writer.flush();
452 break;
453 }
454
455 std::thread::yield_now();
457 },
458 Err(TryRecvError::Disconnected) => {
459 let _ = output_writer.flush();
461 break;
462 }
463 }
464 }
465 }
466 fn process_entry(entry: LogEntry, output_writer: &mut OutputWriter) {
468 let secs = entry.timestamp_nanos / 1_000_000_000;
469 let nanos = entry.timestamp_nanos % 1_000_000_000;
470
471 let format_registry = MESSAGE_FORMATS.get().map(|r| r.read().ok()).flatten();
473
474 let log_line = if let Some(registry) = format_registry {
475 if let Some(format) = registry.get(&entry.message_id) {
476 match Self::decode_and_format_data(&entry.data, format) {
478 Ok(formatted_data) => {
479 format!("[{}] seq:{} ts:{}.{:09} msg_id:{} {}\n",
480 entry.level, entry.sequence, secs, nanos,
481 entry.message_id, formatted_data)
482 },
483 Err(_) => {
484 let hex_data = Self::hex_dump(&entry.data);
486 format!("[{}] seq:{} ts:{}.{:09} msg_id:{} fmt:'{}' data:{}\n",
487 entry.level, entry.sequence, secs, nanos,
488 entry.message_id, format, hex_data)
489 }
490 }
491 } else {
492 let hex_data = Self::hex_dump(&entry.data);
494 format!("[{}] seq:{} ts:{}.{:09} msg_id:{} data:{}\n",
495 entry.level, entry.sequence, secs, nanos,
496 entry.message_id, hex_data)
497 }
498 } else {
499 let hex_data = Self::hex_dump(&entry.data);
501 format!("[{}] seq:{} ts:{}.{:09} msg_id:{} data:{}\n",
502 entry.level, entry.sequence, secs, nanos,
503 entry.message_id, hex_data)
504 };
505
506 let _ = output_writer.write_all(log_line.as_bytes());
508 }
509
510 fn decode_and_format_data(data: &[u8], format: &str) -> Result<String, &'static str> {
512 let mut data_offset = 0;
513
514 let mut output = String::new();
516 let mut i = 0;
517
518 while i < format.len() {
519 if i + 1 < format.len() && &format[i..i+2] == "{}" {
520 match Self::decode_next_arg(data, &mut data_offset) {
522 Ok(decoded_value) => {
523 output.push_str(&decoded_value);
524 },
525 Err(_) => {
526 return Err("Failed to decode binary data");
527 }
528 }
529 i += 2;
530 } else {
531 output.push(format.as_bytes()[i] as char);
532 i += 1;
533 }
534 }
535
536 Ok(output)
537 }
538
539 fn decode_next_arg(data: &[u8], offset: &mut usize) -> Result<String, &'static str> {
541 if *offset >= data.len() {
542 return Err("Not enough data");
543 }
544
545 let remaining = data.len() - *offset;
547
548 if remaining >= 8 {
549 let value = u64::from_le_bytes(data[*offset..*offset+8].try_into().unwrap());
551 *offset += 8;
552
553 let as_f64 = f64::from_bits(value);
555 if as_f64.is_finite() && as_f64.fract() != 0.0 {
556 return Ok(format!("{:.6}", as_f64));
557 } else {
558 return Ok(value.to_string());
559 }
560 } else if remaining >= 4 {
561 let value = u32::from_le_bytes(data[*offset..*offset+4].try_into().unwrap());
563 *offset += 4;
564
565 let as_f32 = f32::from_bits(value);
566 if as_f32.is_finite() && as_f32.fract() != 0.0 {
567 return Ok(format!("{:.3}", as_f32));
568 } else {
569 return Ok(value.to_string());
570 }
571 } else if remaining >= 1 {
572 let value = data[*offset];
574 *offset += 1;
575
576 if value == 0 {
577 return Ok("false".to_string());
578 } else if value == 1 {
579 return Ok("true".to_string());
580 } else {
581 return Ok(value.to_string());
582 }
583 }
584
585 Err("Unknown data type")
586 }
587
588 fn hex_dump(data: &[u8]) -> String {
590 if data.is_empty() {
591 return "[]".to_string();
592 }
593
594 let mut result = "[".to_string();
595 for (i, &byte) in data.iter().enumerate() {
596 if i > 0 {
597 result.push(' ');
598 }
599 result.push_str(&format!("{:02x}", byte));
600 }
601 result.push(']');
602 result
603 }
604
605 pub fn log(&self, entry: LogEntry) -> Result<(), LogEntry> {
607 match self.sender.try_send(entry) {
608 Ok(()) => Ok(()),
609 Err(crossbeam_channel::TrySendError::Full(entry)) => Err(entry),
610 Err(crossbeam_channel::TrySendError::Disconnected(entry)) => Err(entry),
611 }
612 }
613
614 pub fn shutdown(&self) {
616 self.shutdown.store(true, std::sync::atomic::Ordering::Relaxed);
617 }
618
619 pub fn wait_for_shutdown(self) {
621 let _ = self._handle.join();
622 }
623}
624
625pub fn init_lightning_log() -> Result<(), &'static str> {
627 let logger = LightningLogger::new();
628 match GLOBAL_LOGGER.set(logger) {
629 Ok(()) => {
630 let _ = MESSAGE_FORMATS.set(Arc::new(RwLock::new(HashMap::new())));
631 Ok(())
632 },
633 Err(_) => Err("Lightning logger already initialized"),
634 }
635}
636
637pub fn init_lightning_log_with_capacity(capacity: usize) -> Result<(), &'static str> {
639 let logger = LightningLogger::with_capacity(capacity);
640 match GLOBAL_LOGGER.set(logger) {
641 Ok(()) => {
642 let _ = MESSAGE_FORMATS.set(Arc::new(RwLock::new(HashMap::new())));
643 Ok(())
644 },
645 Err(_) => Err("Lightning logger already initialized"),
646 }
647}
648
649pub fn init_lightning_log_with_config(config: LoggerConfig) -> Result<(), &'static str> {
651 let logger = LightningLogger::with_config(config);
652 match GLOBAL_LOGGER.set(logger) {
653 Ok(()) => {
654 let _ = MESSAGE_FORMATS.set(Arc::new(RwLock::new(HashMap::new())));
655 Ok(())
656 },
657 Err(_) => Err("Lightning logger already initialized"),
658 }
659}
660
661pub fn register_message_format(message_id: u32, format: String) -> Result<(), &'static str> {
663 if let Some(formats) = MESSAGE_FORMATS.get() {
664 if let Ok(mut registry) = formats.write() {
665 registry.insert(message_id, format);
666 Ok(())
667 } else {
668 Err("Failed to acquire write lock on message formats")
669 }
670 } else {
671 Err("Lightning logger not initialized")
672 }
673}
674
675#[doc(hidden)]
677pub fn __send_log_entry(entry: LogEntry) {
678 if let Some(logger) = GLOBAL_LOGGER.get() {
679 let _ = logger.log(entry); }
681}
682
683pub fn get_global_logger() -> Option<&'static LightningLogger> {
685 GLOBAL_LOGGER.get()
686}
687
688pub fn shutdown_lightning_log() {
690 if let Some(logger) = GLOBAL_LOGGER.get() {
691 logger.shutdown();
692 }
693}
694
695pub fn wait_for_shutdown() {
697 if let Some(logger) = GLOBAL_LOGGER.get() {
698 logger.shutdown();
701 }
702}
703
704#[macro_export]
706macro_rules! __size_hint {
707 () => { 0 };
708 ($arg:expr) => { $arg.size_hint() };
709 ($arg:expr, $($rest:expr),+) => { $arg.size_hint() + __size_hint!($($rest),+) };
710}
711
712#[macro_export]
714macro_rules! lightning_log {
715 ($level:expr, $msg_id:expr $(, $arg:expr)*) => {{
716 let estimated_size = 0 $(+ $arg.size_hint())*;
717 let mut entry = $crate::LogEntry::new($level, $msg_id, estimated_size);
718
719 $(
720 $crate::FastSerialize::write_to_buffer(&$arg, &mut entry.data);
721 )*
722
723 $crate::__send_log_entry(entry);
724 }};
725}
726
727#[macro_export]
729macro_rules! lightning_debug {
730 ($msg_id:expr $(, $arg:expr)*) => {
731 $crate::lightning_log!($crate::LogLevel::Debug, $msg_id $(, $arg)*)
732 };
733}
734
735#[macro_export]
737macro_rules! lightning_info {
738 ($msg_id:expr $(, $arg:expr)*) => {
739 $crate::lightning_log!($crate::LogLevel::Info, $msg_id $(, $arg)*)
740 };
741}
742
743#[macro_export]
745macro_rules! lightning_warn {
746 ($msg_id:expr $(, $arg:expr)*) => {
747 $crate::lightning_log!($crate::LogLevel::Warn, $msg_id $(, $arg)*)
748 };
749}
750
751#[macro_export]
753macro_rules! lightning_error {
754 ($msg_id:expr $(, $arg:expr)*) => {
755 $crate::lightning_log!($crate::LogLevel::Error, $msg_id $(, $arg)*)
756 };
757}
758
759#[cfg(test)]
760mod tests {
761 use super::*;
762
763 #[test]
764 fn test_basic_logging() {
765 let _ = init_lightning_log();
767
768 let volume = 100.02f64;
769 let price = 20000.0f64;
770 let flag = true;
771 let symbol = "AAPL";
772
773 lightning_info!(1001, volume, price, flag, symbol);
774 lightning_debug!(1002, price, symbol);
775 lightning_warn!(1003, flag);
776 lightning_error!(1004, 42i32);
777
778 std::thread::sleep(std::time::Duration::from_millis(50));
780 }
781
782 #[test]
783 fn test_message_formats() {
784 let _ = init_lightning_log();
786 let _ = register_message_format(2001, "Test message: value={}".to_string());
787
788 lightning_info!(2001, 123.45f64);
789
790 std::thread::sleep(std::time::Duration::from_millis(50));
791 }
792}