octo_flow/lib.rs
1//! Core library for the `octo-flow` GitHub event processing pipeline.
2//!
3//! `octo-flow` is a streaming CLI tool for processing **GitHub Archive
4//! (GHArchive) event datasets**, which are distributed as
5//! newline-delimited JSON (NDJSON).
6//!
7//! The library provides the core event-processing pipeline used by the CLI.
8//! It reads events from an input source, parses them using `serde_json`,
9//! optionally filters them by event type, and outputs selected fields
10//! in a tab-separated format.
11//!
12//! # Architecture
13//!
14//! The processing pipeline follows a streaming architecture designed to
15//! handle large datasets efficiently:
16//!
17//! ```text
18//! input source (file or stdin)
19//! ↓
20//! BufReader
21//! ↓
22//! line-by-line iterator
23//! ↓
24//! serde_json parsing
25//! ↓
26//! optional event filtering
27//! ↓
28//! tab-separated output
29//! ```
30//!
31//! This approach ensures:
32//!
33//! - **constant memory usage**
34//! - **fast startup time**
35//! - **efficient processing of large NDJSON datasets**
36//!
37//! # Example
38//!
39//! ```no_run
40//! use octo_flow::run;
41//!
42//! // Process a GitHub event file without filtering
43//! run("events.json".to_string(), None).unwrap();
44//!
45//! // Process only PushEvent events
46//! run("events.json".to_string(), Some("PushEvent".to_string())).unwrap();
47//! ```
48//!
49//! # CLI Usage
50//!
51//! ```bash
52//! octo-flow --input events.json --event PushEvent
53//! ```
54
55use crate::github_event::GitHubEvent;
56use std::{
57 fs::File,
58 io::{self, BufRead, BufReader, Read},
59};
60use thiserror::Error;
61
62pub mod github_event;
63
64/// Error type for the `octo-flow` processing pipeline.
65///
66/// This enum represents all possible errors that can occur while
67/// reading and processing GitHub event streams.
68#[derive(Debug, Error)]
69pub enum OctoFlowError {
70 /// Error while reading from the input source.
71 #[error(transparent)]
72 IoError(#[from] io::Error),
73
74 /// Error while parsing JSON event data.
75 #[error(transparent)]
76 ParseError(#[from] serde_json::Error),
77}
78
79/// Entry point for the event processing pipeline.
80///
81/// This function determines the input source and delegates processing
82/// to [`process_events`].
83///
84/// # Parameters
85///
86/// - `input` — Path to the NDJSON input file. Use `-` to read from `stdin`.
87/// - `event` — Optional GitHub event type filter.
88///
89/// # Returns
90///
91/// Returns the number of events that matched the filter and were printed.
92///
93/// # Errors
94///
95/// Returns an [`OctoFlowError`] if:
96///
97/// - the input file cannot be opened
98/// - an I/O error occurs while reading
99/// - a JSON parsing error occurs
100///
101/// # Example
102///
103/// ```no_run
104/// use octo_flow::run;
105///
106/// run("events.json".to_string(), None).unwrap();
107/// ```
108pub fn run(input: String, event: Option<String>) -> Result<usize, OctoFlowError> {
109 let input_source: Box<dyn Read> = if input == "-" {
110 Box::new(io::stdin())
111 } else {
112 Box::new(File::open(input)?)
113 };
114
115 process_events(input_source, event)
116}
117
118/// Process a stream of GitHub events.
119///
120/// This function reads newline-delimited JSON events from the provided
121/// input source, parses each event, optionally filters by event type,
122/// and prints selected event fields in **tab-separated format**.
123///
124/// The function processes events **line-by-line**, which enables efficient
125/// handling of large datasets without loading the entire file into memory.
126///
127/// # Parameters
128///
129/// - `source` — Input stream containing NDJSON event data.
130/// - `event` — Optional event type used to filter events.
131///
132/// # Output Format
133///
134/// Matching events are printed to standard output using the following format:
135///
136/// ```text
137/// id created_at actor_login repo_name event_type
138/// ```
139///
140/// Missing fields are replaced with `"n/a"`.
141///
142/// # Returns
143///
144/// Returns the number of events printed.
145///
146/// # Errors
147///
148/// Returns an [`OctoFlowError`] if:
149///
150/// - reading from the input source fails
151/// - JSON parsing fails
152///
153/// # Example
154///
155/// ```no_run
156/// use octo_flow::process_events;
157///
158/// let data = r#"{"id":"1","type":"PushEvent","actor":{"login":"a"},"repo":{"name":"r"},"created_at":"d"}"#;
159/// let reader = data.as_bytes();
160///
161/// let count = process_events(reader, None).unwrap();
162///
163/// assert_eq!(count, 1);
164/// ```
165pub fn process_events<R: Read>(source: R, event: Option<String>) -> Result<usize, OctoFlowError> {
166 let reader = BufReader::new(source);
167 let mut count = 0;
168
169 for line_result in reader.lines() {
170 let line = line_result?;
171 let github_event: GitHubEvent = serde_json::from_str(&line)?;
172
173 if event.as_deref().is_none_or(|f| f == github_event.kind) {
174 println!(
175 "{}\t{}\t{}\t{}\t{}",
176 github_event.id,
177 github_event.created_at,
178 github_event.actor.login.unwrap_or("n/a"),
179 github_event.repo.name.unwrap_or("n/a"),
180 github_event.kind
181 );
182
183 count += 1;
184 }
185 }
186
187 Ok(count)
188}
189
190#[cfg(test)]
191mod tests {
192 use super::*;
193
194 #[test]
195 fn test_valid_event_parsing_works() {
196 let data = r#"{"id":"123","type":"PushEvent","actor":{"login":"coder"},"repo":{"name":"rust-lang/rust"},"created_at":"2026-03-12"}"#;
197 let reader = data.as_bytes();
198
199 let result = process_events(reader, None);
200
201 assert!(result.is_ok());
202 assert_eq!(result.unwrap(), 1);
203 }
204
205 #[test]
206 fn test_invalid_event_parsing_fails() {
207 let data = r#"{"id":"123","type":"PushEvent","actor":{"login":"coder"},"repo":{"name":"rust-lang/rust"}"created_at":"2026-03-12"}"#;
208 let reader = data.as_bytes();
209
210 let result = process_events(reader, None);
211
212 assert!(result.is_err());
213 assert!(matches!(result, Err(OctoFlowError::ParseError(_))))
214 }
215
216 #[test]
217 fn test_filter_works() {
218 let data = "{\"id\":\"1\",\"type\":\"PushEvent\",\"actor\":{\"login\":\"a\"},\"repo\":{\"name\":\"r\"},\"created_at\":\"d\"}\n\
219 {\"id\":\"2\",\"type\":\"WatchEvent\",\"actor\":{\"login\":\"b\"},\"repo\":{\"name\":\"r\"},\"created_at\":\"d\"}";
220 let reader = data.as_bytes();
221
222 let result = process_events(reader, Some("WatchEvent".to_string())).unwrap();
223
224 assert_eq!(result, 1);
225 }
226}