Skip to main content

octo_flow/
lib.rs

1//! Core library for the `octo-flow` GitHub event processing pipeline.
2//!
3//! `octo-flow` is a streaming CLI tool for processing **GitHub Archive
4//! (GHArchive) event datasets**, which are distributed as
5//! newline-delimited JSON (NDJSON).
6//!
7//! The library provides the core event-processing pipeline used by the CLI.
8//! It reads events from an input source, parses them using `serde_json`,
9//! optionally filters them by event type, and outputs selected fields
10//! in a tab-separated format.
11//!
12//! # Architecture
13//!
14//! The processing pipeline follows a streaming architecture designed to
15//! handle large datasets efficiently:
16//!
17//! ```text
18//! input source (file or stdin)
19//!        ↓
20//!     BufReader
21//!        ↓
22//!   line-by-line iterator
23//!        ↓
24//!   serde_json parsing
25//!        ↓
26//!   optional event filtering
27//!        ↓
28//!   tab-separated output
29//! ```
30//!
31//! This approach ensures:
32//!
33//! - **constant memory usage**
34//! - **fast startup time**
35//! - **efficient processing of large NDJSON datasets**
36//!
37//! # Example
38//!
39//! ```no_run
40//! use octo_flow::run;
41//!
42//! // Process a GitHub event file without filtering
43//! run("events.json".to_string(), None).unwrap();
44//!
45//! // Process only PushEvent events
46//! run("events.json".to_string(), Some("PushEvent".to_string())).unwrap();
47//! ```
48//!
49//! # CLI Usage
50//!
51//! ```bash
52//! octo-flow --input events.json --event PushEvent
53//! ```
54
55use crate::github_event::GitHubEvent;
56use std::{
57    fs::File,
58    io::{self, BufRead, BufReader, Read},
59};
60use thiserror::Error;
61
62pub mod github_event;
63
64/// Error type for the `octo-flow` processing pipeline.
65///
66/// This enum represents all possible errors that can occur while
67/// reading and processing GitHub event streams.
68#[derive(Debug, Error)]
69pub enum OctoFlowError {
70    /// Error while reading from the input source.
71    #[error(transparent)]
72    IoError(#[from] io::Error),
73
74    /// Error while parsing JSON event data.
75    #[error(transparent)]
76    ParseError(#[from] serde_json::Error),
77}
78
79/// Entry point for the event processing pipeline.
80///
81/// This function determines the input source and delegates processing
82/// to [`process_events`].
83///
84/// # Parameters
85///
86/// - `input` — Path to the NDJSON input file. Use `-` to read from `stdin`.
87/// - `event` — Optional GitHub event type filter.
88///
89/// # Returns
90///
91/// Returns the number of events that matched the filter and were printed.
92///
93/// # Errors
94///
95/// Returns an [`OctoFlowError`] if:
96///
97/// - the input file cannot be opened
98/// - an I/O error occurs while reading
99/// - a JSON parsing error occurs
100///
101/// # Example
102///
103/// ```no_run
104/// use octo_flow::run;
105///
106/// run("events.json".to_string(), None).unwrap();
107/// ```
108pub fn run(input: String, event: Option<String>) -> Result<usize, OctoFlowError> {
109    let input_source: Box<dyn Read> = if input == "-" {
110        Box::new(io::stdin())
111    } else {
112        Box::new(File::open(input)?)
113    };
114
115    process_events(input_source, event)
116}
117
118/// Process a stream of GitHub events.
119///
120/// This function reads newline-delimited JSON events from the provided
121/// input source, parses each event, optionally filters by event type,
122/// and prints selected event fields in **tab-separated format**.
123///
124/// The function processes events **line-by-line**, which enables efficient
125/// handling of large datasets without loading the entire file into memory.
126///
127/// # Parameters
128///
129/// - `source` — Input stream containing NDJSON event data.
130/// - `event` — Optional event type used to filter events.
131///
132/// # Output Format
133///
134/// Matching events are printed to standard output using the following format:
135///
136/// ```text
137/// id    created_at    actor_login    repo_name    event_type
138/// ```
139///
140/// Missing fields are replaced with `"n/a"`.
141///
142/// # Returns
143///
144/// Returns the number of events printed.
145///
146/// # Errors
147///
148/// Returns an [`OctoFlowError`] if:
149///
150/// - reading from the input source fails
151/// - JSON parsing fails
152///
153/// # Example
154///
155/// ```no_run
156/// use octo_flow::process_events;
157///
158/// let data = r#"{"id":"1","type":"PushEvent","actor":{"login":"a"},"repo":{"name":"r"},"created_at":"d"}"#;
159/// let reader = data.as_bytes();
160///
161/// let count = process_events(reader, None).unwrap();
162///
163/// assert_eq!(count, 1);
164/// ```
165pub fn process_events<R: Read>(source: R, event: Option<String>) -> Result<usize, OctoFlowError> {
166    let reader = BufReader::new(source);
167    let mut count = 0;
168
169    for line_result in reader.lines() {
170        let line = line_result?;
171        let github_event: GitHubEvent = serde_json::from_str(&line)?;
172
173        if event.as_deref().is_none_or(|f| f == github_event.kind) {
174            println!(
175                "{}\t{}\t{}\t{}\t{}",
176                github_event.id,
177                github_event.created_at,
178                github_event.actor.login.unwrap_or("n/a"),
179                github_event.repo.name.unwrap_or("n/a"),
180                github_event.kind
181            );
182
183            count += 1;
184        }
185    }
186
187    Ok(count)
188}
189
190#[cfg(test)]
191mod tests {
192    use super::*;
193
194    #[test]
195    fn test_valid_event_parsing_works() {
196        let data = r#"{"id":"123","type":"PushEvent","actor":{"login":"coder"},"repo":{"name":"rust-lang/rust"},"created_at":"2026-03-12"}"#;
197        let reader = data.as_bytes();
198
199        let result = process_events(reader, None);
200
201        assert!(result.is_ok());
202        assert_eq!(result.unwrap(), 1);
203    }
204
205    #[test]
206    fn test_invalid_event_parsing_fails() {
207        let data = r#"{"id":"123","type":"PushEvent","actor":{"login":"coder"},"repo":{"name":"rust-lang/rust"}"created_at":"2026-03-12"}"#;
208        let reader = data.as_bytes();
209
210        let result = process_events(reader, None);
211
212        assert!(result.is_err());
213        assert!(matches!(result, Err(OctoFlowError::ParseError(_))))
214    }
215
216    #[test]
217    fn test_filter_works() {
218        let data = "{\"id\":\"1\",\"type\":\"PushEvent\",\"actor\":{\"login\":\"a\"},\"repo\":{\"name\":\"r\"},\"created_at\":\"d\"}\n\
219                    {\"id\":\"2\",\"type\":\"WatchEvent\",\"actor\":{\"login\":\"b\"},\"repo\":{\"name\":\"r\"},\"created_at\":\"d\"}";
220        let reader = data.as_bytes();
221
222        let result = process_events(reader, Some("WatchEvent".to_string())).unwrap();
223
224        assert_eq!(result, 1);
225    }
226}