1use lingxia_provider::{BoxFuture, ProviderError};
2use serde::Serialize;
3use std::cell::Cell;
4use std::collections::{HashSet, VecDeque};
5use std::io;
6use std::sync::{Arc, Mutex, OnceLock};
7use std::time::{SystemTime, UNIX_EPOCH};
8use tokio::sync::broadcast;
9use tracing::field::{Field, Visit};
10use tracing_subscriber::{Registry, layer::Layer, prelude::*};
11
12pub const DEFAULT_LOG_LIVE_CAPACITY: usize = 1024;
14pub const DEFAULT_LOG_HISTORY_CAPACITY: usize = 2048;
16pub const DEFAULT_LOG_STREAM_RECENT_LIMIT: usize = 500;
18
19thread_local! {
20 static LOG_DISPATCH_GUARD: Cell<bool> = const { Cell::new(false) };
21}
22
23static GLOBAL_LOG_MANAGER: OnceLock<Arc<LogManager>> = OnceLock::new();
24static TRACING_SUBSCRIBER_READY: OnceLock<()> = OnceLock::new();
25static LOG_PROVIDER: OnceLock<Box<dyn LogProvider>> = OnceLock::new();
26static NO_OP_LOG_PROVIDER: NoOpLogProvider = NoOpLogProvider;
27
28#[derive(Debug, Clone, Copy, Serialize)]
30#[serde(rename_all = "snake_case")]
31pub enum LogLevel {
32 Verbose,
33 Debug,
34 Info,
35 Warn,
36 Error,
37}
38
39#[derive(Debug, Clone, Copy, Serialize)]
40#[serde(rename_all = "snake_case")]
41pub enum LogTag {
42 Native,
43 WebViewConsole,
44 LxAppServiceConsole,
45}
46
47impl LogTag {
48 pub fn as_str(self) -> &'static str {
49 match self {
50 Self::Native => "Native",
51 Self::WebViewConsole => "JSView",
52 Self::LxAppServiceConsole => "JSService",
53 }
54 }
55}
56
57#[derive(Debug, Clone, Serialize)]
59pub struct LogMessage {
60 pub timestamp_ms: u64,
61 pub tag: LogTag,
62 pub level: LogLevel,
63 pub appid: Option<String>,
64 pub path: Option<String>,
65 pub target: Option<String>,
66 pub message: String,
67}
68
69impl Default for LogMessage {
70 fn default() -> Self {
71 Self {
72 timestamp_ms: 0,
73 tag: LogTag::Native,
74 level: LogLevel::Info,
75 appid: None,
76 path: None,
77 target: None,
78 message: String::new(),
79 }
80 }
81}
82
83impl LogMessage {
84 pub fn new(tag: LogTag, message: impl Into<String>) -> Self {
85 Self {
86 timestamp_ms: now_timestamp_ms(),
87 tag,
88 level: LogLevel::Info,
89 appid: None,
90 path: None,
91 target: None,
92 message: message.into(),
93 }
94 }
95
96 pub fn with_level(mut self, level: LogLevel) -> Self {
97 self.level = level;
98 self
99 }
100
101 pub fn with_appid(mut self, appid: impl Into<String>) -> Self {
102 self.appid = normalize_optional_string(Some(appid.into()));
103 self
104 }
105
106 pub fn with_path(mut self, path: impl Into<String>) -> Self {
107 self.path = normalize_optional_string(Some(path.into()));
108 self
109 }
110
111 pub fn with_target(mut self, target: impl Into<String>) -> Self {
112 self.target = normalize_optional_string(Some(target.into()));
113 self
114 }
115}
116
117#[derive(Debug, Clone)]
119pub struct CollectedLogArchive {
120 pub file_name: String,
121 pub content_type: &'static str,
122 pub encoding: &'static str,
123 pub entry_count: usize,
124 pub lxapp_ids: Vec<String>,
125 pub bytes: Vec<u8>,
126}
127
128#[derive(Debug, Clone)]
130pub struct CollectedLogArchiveInfo {
131 pub file_name: String,
132 pub content_type: &'static str,
133 pub encoding: &'static str,
134 pub entry_count: usize,
135 pub lxapp_ids: Vec<String>,
136}
137
138impl CollectedLogArchive {
139 pub fn from_entries(entries: &[LogMessage]) -> io::Result<Self> {
140 let mut lxapp_ids = Vec::new();
141 let mut seen_lxapp_ids = HashSet::new();
142 let mut jsonl = Vec::new();
143 for entry in entries {
144 if let Some(appid) = entry.appid.as_deref()
145 && seen_lxapp_ids.insert(appid.to_string())
146 {
147 lxapp_ids.push(appid.to_string());
148 }
149 serde_json::to_writer(&mut jsonl, entry)
150 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))?;
151 jsonl.push(b'\n');
152 }
153
154 let bytes = zstd::stream::encode_all(io::Cursor::new(jsonl), 3)?;
155 Ok(Self {
156 file_name: format!("lingxia-logs-{}.jsonl.zst", now_timestamp_ms()),
157 content_type: "application/zstd",
158 encoding: "jsonl+zstd",
159 entry_count: entries.len(),
160 lxapp_ids,
161 bytes,
162 })
163 }
164
165 pub fn info(&self) -> CollectedLogArchiveInfo {
166 CollectedLogArchiveInfo {
167 file_name: self.file_name.clone(),
168 content_type: self.content_type,
169 encoding: self.encoding,
170 entry_count: self.entry_count,
171 lxapp_ids: self.lxapp_ids.clone(),
172 }
173 }
174}
175
176pub struct AttachedLogStream {
178 pub recent: Vec<LogMessage>,
179 pub receiver: broadcast::Receiver<LogMessage>,
180}
181
182impl AttachedLogStream {
183 pub fn recent(&self) -> &[LogMessage] {
185 &self.recent
186 }
187
188 pub fn into_parts(self) -> (Vec<LogMessage>, broadcast::Receiver<LogMessage>) {
190 (self.recent, self.receiver)
191 }
192
193 pub async fn recv(&mut self) -> Result<LogMessage, broadcast::error::RecvError> {
195 self.receiver.recv().await
196 }
197
198 pub fn try_recv(&mut self) -> Result<LogMessage, broadcast::error::TryRecvError> {
200 self.receiver.try_recv()
201 }
202}
203
204#[derive(Debug, Clone, Copy, PartialEq, Eq)]
205pub struct LogBufferConfig {
206 pub live_capacity: usize,
207 pub history_capacity: usize,
208}
209
210impl Default for LogBufferConfig {
211 fn default() -> Self {
212 Self {
213 live_capacity: DEFAULT_LOG_LIVE_CAPACITY,
214 history_capacity: DEFAULT_LOG_HISTORY_CAPACITY,
215 }
216 }
217}
218
219pub struct LogBuffer {
221 sender: broadcast::Sender<LogMessage>,
222 history: Mutex<VecDeque<LogMessage>>,
223 config: LogBufferConfig,
224}
225
226impl LogBuffer {
227 pub fn new(config: LogBufferConfig) -> Self {
228 let (sender, _) = broadcast::channel(config.live_capacity.max(1));
229 Self {
230 sender,
231 history: Mutex::new(VecDeque::with_capacity(config.history_capacity.max(1))),
232 config,
233 }
234 }
235
236 pub fn subscribe(&self) -> broadcast::Receiver<LogMessage> {
237 self.sender.subscribe()
238 }
239
240 pub fn attach(&self, recent_limit: usize) -> AttachedLogStream {
241 let history = self
242 .history
243 .lock()
244 .unwrap_or_else(|poisoned| poisoned.into_inner());
245 let receiver = self.sender.subscribe();
246 let recent_limit = clamp_recent_limit(recent_limit, history.len());
247 let recent = history
248 .iter()
249 .skip(history.len().saturating_sub(recent_limit))
250 .cloned()
251 .collect();
252 AttachedLogStream { recent, receiver }
253 }
254
255 pub fn snapshot_recent(&self, limit: usize) -> Vec<LogMessage> {
256 let history = self
257 .history
258 .lock()
259 .unwrap_or_else(|poisoned| poisoned.into_inner());
260 let limit = clamp_recent_limit(limit, history.len());
261 history
262 .iter()
263 .skip(history.len().saturating_sub(limit))
264 .cloned()
265 .collect()
266 }
267
268 pub fn collect_archive(&self, limit: usize) -> io::Result<CollectedLogArchive> {
269 let entries = self.snapshot_recent(limit);
270 CollectedLogArchive::from_entries(&entries)
271 }
272
273 pub fn push(&self, message: LogMessage) {
274 let entry = message.clone();
275 {
276 let mut history = self
277 .history
278 .lock()
279 .unwrap_or_else(|poisoned| poisoned.into_inner());
280 if history.len() >= self.config.history_capacity.max(1) {
281 history.pop_front();
282 }
283 history.push_back(entry);
284 }
285
286 let _ = self.sender.send(message);
287 }
288}
289
290fn clamp_recent_limit(requested: usize, available: usize) -> usize {
291 if requested == 0 {
292 available
293 } else {
294 requested.min(available)
295 }
296}
297
298pub fn normalize_optional_string(value: Option<String>) -> Option<String> {
299 value.and_then(|value| {
300 let trimmed = value.trim();
301 if trimmed.is_empty() {
302 None
303 } else {
304 Some(trimmed.to_string())
305 }
306 })
307}
308
309pub fn now_timestamp_ms() -> u64 {
310 SystemTime::now()
311 .duration_since(UNIX_EPOCH)
312 .unwrap_or_default()
313 .as_millis() as u64
314}
315
316pub trait LogProvider: Send + Sync + 'static {
326 fn on_log(&self, _message: &LogMessage) {}
332
333 fn upload_collected_logs<'a>(
335 &'a self,
336 _archive: CollectedLogArchive,
337 ) -> BoxFuture<'a, Result<(), ProviderError>> {
338 Box::pin(async { Ok(()) })
339 }
340}
341
342struct NoOpLogProvider;
343
344impl LogProvider for NoOpLogProvider {}
345
346#[derive(Debug, thiserror::Error)]
347pub enum LogStreamError {
348 #[error("log manager is not initialized")]
349 NotInitialized,
350}
351
352pub struct LogManager {
358 buffer: LogBuffer,
359 logger: Box<dyn Fn(&LogMessage) + Send + Sync>,
360}
361
362pub struct LogTracingLayer;
363
364struct DispatchGuardReset;
365
366impl Drop for DispatchGuardReset {
367 fn drop(&mut self) {
368 LOG_DISPATCH_GUARD.with(|guard| guard.set(false));
369 }
370}
371
372impl LogManager {
373 pub fn init<F>(logger: F) -> Arc<Self>
375 where
376 F: Fn(&LogMessage) + Send + Sync + 'static,
377 {
378 let manager = GLOBAL_LOG_MANAGER
379 .get_or_init(|| {
380 Arc::new(LogManager {
381 buffer: LogBuffer::new(LogBufferConfig::default()),
382 logger: Box::new(logger),
383 })
384 })
385 .clone();
386
387 init_tracing();
390
391 manager
392 }
393
394 pub fn get() -> Option<Arc<Self>> {
396 GLOBAL_LOG_MANAGER.get().cloned()
397 }
398
399 pub fn subscribe(&self) -> broadcast::Receiver<LogMessage> {
401 self.buffer.subscribe()
402 }
403
404 pub fn attach(&self, recent_limit: usize) -> AttachedLogStream {
409 self.buffer.attach(recent_limit)
410 }
411
412 pub fn attach_default(&self) -> AttachedLogStream {
414 self.attach(DEFAULT_LOG_STREAM_RECENT_LIMIT)
415 }
416
417 pub fn print_to_native(&self, message: &LogMessage) {
419 (self.logger)(message);
420 }
421
422 pub fn snapshot_recent(&self, limit: usize) -> Vec<LogMessage> {
424 self.buffer.snapshot_recent(limit)
425 }
426
427 pub fn collect_archive(&self, limit: usize) -> io::Result<CollectedLogArchive> {
429 self.buffer.collect_archive(limit)
430 }
431
432 fn dispatch(&self, message: LogMessage) {
433 let should_dispatch = LOG_DISPATCH_GUARD.with(|guard| {
434 if guard.get() {
435 false
436 } else {
437 guard.set(true);
438 true
439 }
440 });
441
442 if !should_dispatch {
443 return;
444 }
445
446 let _reset_guard = DispatchGuardReset;
447 self.buffer.push(message.clone());
448 get_log_provider().on_log(&message);
449 (self.logger)(&message);
450 }
451}
452
453pub fn register_log_provider(provider: Box<dyn LogProvider>) {
455 if LOG_PROVIDER.set(provider).is_err() {
456 panic!("register_log_provider called more than once");
457 }
458}
459
460fn get_log_provider() -> &'static dyn LogProvider {
461 LOG_PROVIDER
462 .get()
463 .map(|b| b.as_ref())
464 .unwrap_or(&NO_OP_LOG_PROVIDER)
465}
466
467fn init_tracing() {
469 if TRACING_SUBSCRIBER_READY.get().is_some() {
470 return;
471 }
472
473 let subscriber = Registry::default().with(tracing_layer());
474 if tracing::subscriber::set_global_default(subscriber).is_ok() {
475 let _ = TRACING_SUBSCRIBER_READY.set(());
476 }
477}
478
479pub fn tracing_layer() -> LogTracingLayer {
480 LogTracingLayer
481}
482
483pub fn attach_log_stream(recent_limit: usize) -> Result<AttachedLogStream, LogStreamError> {
489 let manager = LogManager::get().ok_or(LogStreamError::NotInitialized)?;
490 Ok(manager.attach(recent_limit))
491}
492
493pub fn attach_log_stream_default() -> Result<AttachedLogStream, LogStreamError> {
495 let manager = LogManager::get().ok_or(LogStreamError::NotInitialized)?;
496 Ok(manager.attach_default())
497}
498
499pub fn log(tag: LogTag, level: LogLevel, message: impl std::fmt::Display) {
501 let mut log_message = new_log_message(tag, message);
502 log_message.level = level;
503 emit_log_message(log_message);
504}
505
506pub async fn upload_collected_logs(limit: usize) -> Result<CollectedLogArchiveInfo, ProviderError> {
512 let manager = LogManager::get()
513 .ok_or_else(|| ProviderError::internal("log manager is not initialized"))?;
514 let archive = manager
515 .collect_archive(limit)
516 .map_err(|err| ProviderError::internal(format!("collect logs failed: {err}")))?;
517 let metadata = archive.info();
518 get_log_provider().upload_collected_logs(archive).await?;
519 Ok(metadata)
520}
521
522pub struct LogBuilder {
524 message: LogMessage,
525}
526
527impl LogBuilder {
528 pub fn new(tag: LogTag, message: impl std::fmt::Display) -> Self {
529 Self {
530 message: new_log_message(tag, message),
531 }
532 }
533
534 pub fn with_appid(mut self, appid: impl Into<String>) -> Self {
535 self.message.appid = normalize_optional_string(Some(appid.into()));
536 self
537 }
538
539 pub fn with_path(mut self, path: impl Into<String>) -> Self {
540 self.message.path = normalize_optional_string(Some(path.into()));
541 self
542 }
543
544 pub fn with_level(mut self, level: LogLevel) -> Self {
545 self.message.level = level;
546 self
547 }
548
549 pub fn with_target(mut self, target: impl Into<String>) -> Self {
550 self.message.target = normalize_optional_string(Some(target.into()));
551 self
552 }
553}
554
555impl Drop for LogBuilder {
556 fn drop(&mut self) {
557 emit_log_message(std::mem::take(&mut self.message));
558 }
559}
560
561fn emit_log_message(message: LogMessage) {
562 emit_tracing_event(&message);
563
564 if let Some(manager) = GLOBAL_LOG_MANAGER.get() {
565 manager.dispatch(message);
566 }
567}
568
569fn emit_tracing_event(message: &LogMessage) {
570 let appid = message.appid.as_deref().unwrap_or("");
571 let path = message.path.as_deref().unwrap_or("");
572 let target = message.target.as_deref().unwrap_or("");
573 let log_tag = message.tag.as_str();
574
575 macro_rules! emit {
576 ($level:expr) => {
577 tracing::event!(
578 target: "lingxia.log",
579 $level,
580 lx_emitted = true,
581 log_tag,
582 appid,
583 path,
584 target,
585 message = %message.message
586 );
587 };
588 }
589
590 match message.level {
591 LogLevel::Verbose => {
592 emit!(tracing::Level::TRACE);
593 }
594 LogLevel::Debug => {
595 emit!(tracing::Level::DEBUG);
596 }
597 LogLevel::Info => {
598 emit!(tracing::Level::INFO);
599 }
600 LogLevel::Warn => {
601 emit!(tracing::Level::WARN);
602 }
603 LogLevel::Error => {
604 emit!(tracing::Level::ERROR);
605 }
606 }
607}
608
609fn log_level_from_tracing_level(level: &tracing::Level) -> LogLevel {
610 match *level {
611 tracing::Level::ERROR => LogLevel::Error,
612 tracing::Level::WARN => LogLevel::Warn,
613 tracing::Level::INFO => LogLevel::Info,
614 tracing::Level::DEBUG => LogLevel::Debug,
615 tracing::Level::TRACE => LogLevel::Verbose,
616 }
617}
618
619fn new_log_message(tag: LogTag, message: impl std::fmt::Display) -> LogMessage {
620 LogMessage::new(tag, message.to_string())
621}
622
623fn log_tag_from_str(value: &str) -> Option<LogTag> {
624 match value {
625 "Native" => Some(LogTag::Native),
626 "JSView" => Some(LogTag::WebViewConsole),
627 "JSService" => Some(LogTag::LxAppServiceConsole),
628 _ => None,
629 }
630}
631
632#[derive(Default)]
633struct TracingEventVisitor {
634 message: Option<String>,
635 appid: Option<String>,
636 path: Option<String>,
637 target_field: Option<String>,
638 log_tag: Option<String>,
639 namespace: Option<String>,
640 scope: Option<String>,
641 lx_emitted: Option<String>,
642}
643
644impl TracingEventVisitor {
645 fn record_value(&mut self, field: &Field, value: String) {
646 match field.name() {
647 "message" => self.message = Some(value),
648 "appid" => self.appid = Some(value),
649 "path" => self.path = Some(value),
650 "target" => self.target_field = Some(value),
651 "log_tag" => self.log_tag = Some(value),
652 "namespace" => self.namespace = Some(value),
653 "scope" => self.scope = Some(value),
654 "lx_emitted" => self.lx_emitted = Some(value),
655 _ => {}
656 }
657 }
658}
659
660impl Visit for TracingEventVisitor {
661 fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
662 self.record_value(field, format!("{value:?}"));
663 }
664
665 fn record_str(&mut self, field: &Field, value: &str) {
666 self.record_value(field, value.to_string());
667 }
668
669 fn record_i64(&mut self, field: &Field, value: i64) {
670 self.record_value(field, value.to_string());
671 }
672
673 fn record_u64(&mut self, field: &Field, value: u64) {
674 self.record_value(field, value.to_string());
675 }
676
677 fn record_bool(&mut self, field: &Field, value: bool) {
678 self.record_value(field, value.to_string());
679 }
680}
681
682impl<S> Layer<S> for LogTracingLayer
683where
684 S: tracing::Subscriber,
685{
686 fn on_event(
687 &self,
688 event: &tracing::Event<'_>,
689 _ctx: tracing_subscriber::layer::Context<'_, S>,
690 ) {
691 let Some(manager) = LogManager::get() else {
692 return;
693 };
694
695 let metadata = event.metadata();
696 let mut visitor = TracingEventVisitor::default();
697 event.record(&mut visitor);
698
699 if visitor.lx_emitted.as_deref() == Some("true") {
700 return;
701 }
702
703 let tag = if metadata.target() == "rong.js.console" {
704 match visitor.scope.as_deref() {
705 Some("appservice") => LogTag::LxAppServiceConsole,
706 _ => LogTag::Native,
707 }
708 } else {
709 visitor
710 .log_tag
711 .as_deref()
712 .and_then(log_tag_from_str)
713 .unwrap_or(LogTag::Native)
714 };
715
716 let target = if metadata.target() == "rong.js.console" {
717 visitor.target_field
718 } else {
719 visitor
720 .target_field
721 .or_else(|| Some(metadata.target().to_string()))
722 };
723
724 let message = LogMessage {
725 timestamp_ms: now_timestamp_ms(),
726 tag,
727 level: log_level_from_tracing_level(metadata.level()),
728 appid: normalize_optional_string(visitor.appid.or(visitor.namespace)),
729 path: normalize_optional_string(visitor.path),
730 target: normalize_optional_string(target),
731 message: visitor
732 .message
733 .unwrap_or_else(|| metadata.name().to_string()),
734 };
735
736 manager.dispatch(message);
737 }
738}