rs_git_log2arrow_ipc_stream/
lib.rs

1use arrow::array::{ArrayRef, ListBuilder, StringBuilder, TimestampSecondBuilder};
2use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
3use arrow::ipc::writer::StreamWriter;
4use arrow::record_batch::RecordBatch;
5use chrono::DateTime;
6use gix::Repository;
7use gix::bstr::ByteSlice;
8use std::io;
9use std::io::Write;
10use std::sync::Arc;
11
12pub fn get_arrow_schema() -> Schema {
13    Schema::new(vec![
14        Field::new("commit_hash", DataType::Utf8, false),
15        Field::new("message", DataType::Utf8, false),
16        Field::new("author_name", DataType::Utf8, false),
17        Field::new("author_email", DataType::Utf8, false),
18        Field::new(
19            "author_timestamp",
20            DataType::Timestamp(TimeUnit::Second, None),
21            false,
22        ),
23        Field::new("committer_name", DataType::Utf8, false),
24        Field::new("committer_email", DataType::Utf8, false),
25        Field::new(
26            "committer_timestamp",
27            DataType::Timestamp(TimeUnit::Second, None),
28            false,
29        ),
30        Field::new(
31            "parent_hashes",
32            DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
33            false,
34        ),
35    ])
36}
37
38pub fn log2arrow_ipc_stream_writer<W>(
39    repo: &Repository,
40    wtr: &mut W,
41    trim_message: bool,
42    max_count: usize,
43    author_filter: Option<String>,
44    since: Option<String>,
45    until: Option<String>,
46) -> Result<(), io::Error>
47where
48    W: Write,
49{
50    let schema = get_arrow_schema();
51    let mut writer = StreamWriter::try_new(wtr, &schema).map_err(io::Error::other)?;
52
53    let head = repo.head_commit().map_err(io::Error::other)?;
54
55    let since_timestamp = since
56        .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
57        .map(|dt| dt.timestamp());
58    let until_timestamp = until
59        .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
60        .map(|dt| dt.timestamp());
61
62    let mut hash_builder = StringBuilder::new();
63    let mut message_builder = StringBuilder::new();
64    let mut author_name_builder = StringBuilder::new();
65    let mut author_email_builder = StringBuilder::new();
66    let mut author_timestamp_builder = TimestampSecondBuilder::new();
67    let mut committer_name_builder = StringBuilder::new();
68    let mut committer_email_builder = StringBuilder::new();
69    let mut committer_timestamp_builder = TimestampSecondBuilder::new();
70    let mut parent_hashes_builder = ListBuilder::new(StringBuilder::new());
71
72    let commits = head.ancestors().all().map_err(io::Error::other)?;
73    for commit_info in commits.take(max_count) {
74        let commit_info = commit_info.map_err(io::Error::other)?;
75        let commit = commit_info.object().map_err(io::Error::other)?;
76
77        let author = commit.author().map_err(io::Error::other)?;
78
79        if let Some(filter_author) = &author_filter
80            && author.name != *filter_author
81        {
82            continue;
83        }
84
85        if let Some(since) = since_timestamp
86            && author.time.seconds < since
87        {
88            continue;
89        }
90
91        if let Some(until) = until_timestamp
92            && author.time.seconds > until
93        {
94            continue;
95        }
96
97        hash_builder.append_value(commit_info.id.to_string());
98
99        let message = commit.message_raw().map_err(io::Error::other)?;
100        if trim_message {
101            message_builder.append_value(&String::from_utf8_lossy(message.trim_end()));
102        } else {
103            message_builder.append_value(message.to_string());
104        }
105
106        author_name_builder.append_value(author.name.to_string());
107        author_email_builder.append_value(author.email.to_string());
108        author_timestamp_builder.append_value(author.time.seconds);
109
110        let committer = commit.committer().map_err(io::Error::other)?;
111        committer_name_builder.append_value(committer.name.to_string());
112        committer_email_builder.append_value(committer.email.to_string());
113        committer_timestamp_builder.append_value(committer.time.seconds);
114
115        let parents_builder = parent_hashes_builder.values();
116        for parent_id in commit.parent_ids() {
117            parents_builder.append_value(parent_id.to_string());
118        }
119        parent_hashes_builder.append(true);
120    }
121
122    let batch = RecordBatch::try_new(
123        Arc::new(schema.clone()),
124        vec![
125            Arc::new(hash_builder.finish()) as ArrayRef,
126            Arc::new(message_builder.finish()) as ArrayRef,
127            Arc::new(author_name_builder.finish()) as ArrayRef,
128            Arc::new(author_email_builder.finish()) as ArrayRef,
129            Arc::new(author_timestamp_builder.finish()) as ArrayRef,
130            Arc::new(committer_name_builder.finish()) as ArrayRef,
131            Arc::new(committer_email_builder.finish()) as ArrayRef,
132            Arc::new(committer_timestamp_builder.finish()) as ArrayRef,
133            Arc::new(parent_hashes_builder.finish()) as ArrayRef,
134        ],
135    )
136    .map_err(io::Error::other)?;
137
138    writer.write(&batch).map_err(io::Error::other)?;
139    writer.finish().map_err(io::Error::other)?;
140
141    Ok(())
142}