1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
//! LogSession — top-level orchestrator for a log viewing session.
use crate::filter::engine::FilterEngine;
use crate::parser::group::ParserGroup;
use crate::record::LogRecord;
use crate::store::LogStore;
use crate::traits::{LogLoader, LogProcessor, Result};
use crate::view::LogStoreView;
use rayon::prelude::*;
use std::sync::mpsc;
use std::sync::Arc;
use tracing::{info, instrument, warn};
/// Represents a registered loader paired with its parser group.
struct LoaderSlot {
loader: Box<dyn LogLoader>,
parser_group: ParserGroup,
}
/// The top-level session managing the full pipeline:
/// Load → Parse → Store → Process → Filter → Filtered View.
pub struct LogSession {
loader_slots: Vec<LoaderSlot>,
store: LogStore,
processors: Vec<Box<dyn LogProcessor>>,
/// Currently active view — serves TUI, always has valid results.
active_view: LogStoreView,
/// Pending view — created when filter changes, not yet applied (sync path).
pending_view: Option<LogStoreView>,
/// Receiver for async background filtering result.
/// When present, a background thread is filtering.
async_pending: Option<mpsc::Receiver<LogStoreView>>,
/// Records that failed parsing by all parsers in their group.
pub failing_parsing_logs: Vec<FailedLog>,
/// Auto-incrementing record ID counter.
next_id: u64,
}
/// A log line that could not be parsed.
#[derive(Debug, Clone)]
pub struct FailedLog {
pub raw: String,
pub source: String,
pub loader_id: String,
}
impl LogSession {
/// Create a new empty session.
pub fn new() -> Self {
Self {
loader_slots: Vec::new(),
store: LogStore::new(),
processors: Vec::new(),
active_view: LogStoreView::new(FilterEngine::new()),
pending_view: None,
async_pending: None,
failing_parsing_logs: Vec::new(),
next_id: 0,
}
}
/// Register a loader with its associated parser group.
#[instrument(skip(self, loader, parser_group))]
pub fn add_loader(&mut self, loader: Box<dyn LogLoader>, parser_group: ParserGroup) {
self.loader_slots.push(LoaderSlot {
loader,
parser_group,
});
}
/// Register a post-processor.
pub fn add_processor(&mut self, processor: Box<dyn LogProcessor>) {
self.processors.push(processor);
}
/// Access the filter engine of the active view for adding/removing filters.
///
/// Note: after modifying filters, call `apply_pending()` or use `update_filter()`
/// for the dual-buffer workflow.
pub fn filter_engine_mut(&mut self) -> &mut FilterEngine {
self.active_view.filter_engine_mut()
}
/// Access the store.
pub fn store(&self) -> &LogStore {
&self.store
}
/// Get the currently active view.
pub fn active_view(&self) -> &LogStoreView {
&self.active_view
}
/// Whether a pending view exists (sync or async filter update in progress).
pub fn has_pending_view(&self) -> bool {
self.pending_view.is_some() || self.async_pending.is_some()
}
/// Whether an async background filter is in progress.
pub fn is_filtering(&self) -> bool {
self.async_pending.is_some()
}
/// Start a filter update using the synchronous dual-buffer mechanism.
///
/// Creates a new pending view with the given filter engine.
/// If a pending view already exists, it is discarded and replaced.
/// Also cancels any in-flight async filtering.
#[instrument(skip(self, filter_engine))]
pub fn update_filter(&mut self, filter_engine: FilterEngine) {
self.async_pending = None; // cancel any async work
self.pending_view = Some(LogStoreView::new(filter_engine));
}
/// Start a filter update that runs in a background thread.
///
/// Snapshots current store records and spawns a thread to apply the filter.
/// Call `poll_pending()` to check for completion and swap views.
/// If called again before completion, the previous async work is cancelled
/// (receiver dropped, thread result discarded).
#[instrument(skip(self, filter_engine))]
pub fn update_filter_async(&mut self, filter_engine: FilterEngine) {
self.pending_view = None; // discard sync pending
// Share store records with background thread via Arc (zero-copy, no deep clone)
let records: Arc<[Arc<LogRecord>]> = self.store.iter_arc().cloned().collect();
let total_count = records.len();
let (tx, rx) = mpsc::channel();
self.async_pending = Some(rx);
std::thread::spawn(move || {
let mut view = LogStoreView::new(filter_engine);
view.apply_from_records(records.iter().map(|r| r.as_ref()), total_count);
// Send might fail if receiver was dropped (cancelled) — that's OK
let _ = tx.send(view);
});
}
/// Poll for async filtering completion. If the background thread finished,
/// swap the completed view into active_view and return `true`.
/// Returns `false` if no async work is pending or it hasn't completed yet.
#[instrument(skip(self))]
pub fn poll_pending(&mut self) -> bool {
if let Some(ref rx) = self.async_pending {
match rx.try_recv() {
Ok(view) => {
self.active_view = view;
self.async_pending = None;
true
}
Err(mpsc::TryRecvError::Empty) => false,
Err(mpsc::TryRecvError::Disconnected) => {
// Thread finished but send failed (shouldn't happen normally)
self.async_pending = None;
false
}
}
} else {
false
}
}
/// Apply the pending view's filter against the store, then replace active view.
/// For synchronous pending views only.
///
/// If no sync pending view exists, this is a no-op.
#[instrument(skip(self))]
pub fn apply_pending(&mut self) {
if let Some(mut pending) = self.pending_view.take() {
pending.apply(&self.store);
self.active_view = pending;
}
}
/// Re-apply the active view's filter against the store.
///
/// Use after modifying filters via `filter_engine_mut()`.
#[instrument(skip(self))]
pub fn refresh_active_view(&mut self) {
self.active_view.apply(&self.store);
}
/// Execute the full pipeline: Load → Parse → Store → Process → Filter.
/// Returns the filtered view (indices into the store).
#[instrument(skip(self))]
pub fn run(&mut self) -> Result<Vec<usize>> {
// 1. Load + Parse
for slot in &mut self.loader_slots {
let info = slot.loader.info().clone();
let lines = slot.loader.load()?;
info!(loader_id = %info.id, line_count = lines.len(), "loaded lines from source");
let source = &info.id;
for line in &lines {
let id = self.next_id;
self.next_id += 1;
match slot.parser_group.parse(line, source, &info.id, id) {
Some(mut record) => {
if record.raw.is_empty() {
record.raw = line.clone();
}
self.store.insert(record);
}
None => {
self.failing_parsing_logs.push(FailedLog {
raw: line.clone(),
source: source.clone(),
loader_id: info.id.clone(),
});
}
}
}
}
// 2. Process (collect only if processors exist)
if !self.processors.is_empty() {
let records: Vec<LogRecord> = self.store.iter().cloned().collect();
for processor in &self.processors {
processor.process(&records)?;
}
}
// 3. Apply active view filter
self.active_view.apply(&self.store);
info!(
store_size = self.store.len(),
filtered_size = self.active_view.len(),
"session run complete"
);
Ok(self.active_view.indices().to_vec())
}
/// Get the filtered view based on current active view's cache.
pub fn filtered_view(&self) -> Vec<usize> {
self.active_view.indices().to_vec()
}
/// Execute the pipeline with parallel loading and parsing across loader slots.
#[instrument(skip(self))]
pub fn run_parallel(&mut self) -> Result<Vec<usize>> {
// 1. Load + Parse in parallel
let results: Vec<Result<(Vec<LogRecord>, Vec<FailedLog>)>> = self
.loader_slots
.par_iter_mut()
.map(|slot| {
let info = slot.loader.info().clone();
let lines = slot.loader.load()?;
let source = &info.id;
let mut records = Vec::new();
let mut failures = Vec::new();
for (i, line) in lines.iter().enumerate() {
match slot.parser_group.parse(line, source, &info.id, i as u64) {
Some(mut record) => {
if record.raw.is_empty() {
record.raw = line.clone();
}
records.push(record);
}
None => failures.push(FailedLog {
raw: line.clone(),
source: source.clone(),
loader_id: info.id.clone(),
}),
}
}
Ok((records, failures))
})
.collect();
// 2. Merge results sequentially
for result in results {
let (records, failures) = result?;
for mut record in records {
record.id = self.next_id;
self.next_id += 1;
self.store.insert(record);
}
self.failing_parsing_logs.extend(failures);
}
// 3. Process (collect only if processors exist)
if !self.processors.is_empty() {
let records: Vec<LogRecord> = self.store.iter().cloned().collect();
for processor in &self.processors {
processor.process(&records)?;
}
}
// 4. Apply active view filter
self.active_view.apply(&self.store);
info!(
store_size = self.store.len(),
filtered_size = self.active_view.len(),
failures = self.failing_parsing_logs.len(),
"parallel session run complete"
);
Ok(self.active_view.indices().to_vec())
}
}
impl Default for LogSession {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
#[path = "session_tests.rs"]
mod session_tests;