rs_git_log2arrow_ipc_stream/
lib.rs1use arrow::array::{ArrayRef, ListBuilder, StringBuilder, TimestampSecondBuilder};
2use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
3use arrow::ipc::writer::StreamWriter;
4use arrow::record_batch::RecordBatch;
5use chrono::DateTime;
6use gix::Repository;
7use gix::bstr::ByteSlice;
8use std::io;
9use std::io::Write;
10use std::sync::Arc;
11
12pub fn get_arrow_schema() -> Schema {
13 Schema::new(vec![
14 Field::new("commit_hash", DataType::Utf8, false),
15 Field::new("message", DataType::Utf8, false),
16 Field::new("author_name", DataType::Utf8, false),
17 Field::new("author_email", DataType::Utf8, false),
18 Field::new(
19 "author_timestamp",
20 DataType::Timestamp(TimeUnit::Second, None),
21 false,
22 ),
23 Field::new("committer_name", DataType::Utf8, false),
24 Field::new("committer_email", DataType::Utf8, false),
25 Field::new(
26 "committer_timestamp",
27 DataType::Timestamp(TimeUnit::Second, None),
28 false,
29 ),
30 Field::new(
31 "parent_hashes",
32 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
33 false,
34 ),
35 ])
36}
37
38pub fn log2arrow_ipc_stream_writer<W>(
39 repo: &Repository,
40 wtr: &mut W,
41 trim_message: bool,
42 max_count: usize,
43 author_filter: Option<String>,
44 since: Option<String>,
45 until: Option<String>,
46) -> Result<(), io::Error>
47where
48 W: Write,
49{
50 let schema = get_arrow_schema();
51 let mut writer = StreamWriter::try_new(wtr, &schema).map_err(io::Error::other)?;
52
53 let head = repo.head_commit().map_err(io::Error::other)?;
54
55 let since_timestamp = since
56 .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
57 .map(|dt| dt.timestamp());
58 let until_timestamp = until
59 .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
60 .map(|dt| dt.timestamp());
61
62 let mut hash_builder = StringBuilder::new();
63 let mut message_builder = StringBuilder::new();
64 let mut author_name_builder = StringBuilder::new();
65 let mut author_email_builder = StringBuilder::new();
66 let mut author_timestamp_builder = TimestampSecondBuilder::new();
67 let mut committer_name_builder = StringBuilder::new();
68 let mut committer_email_builder = StringBuilder::new();
69 let mut committer_timestamp_builder = TimestampSecondBuilder::new();
70 let mut parent_hashes_builder = ListBuilder::new(StringBuilder::new());
71
72 let commits = head.ancestors().all().map_err(io::Error::other)?;
73 for commit_info in commits.take(max_count) {
74 let commit_info = commit_info.map_err(io::Error::other)?;
75 let commit = commit_info.object().map_err(io::Error::other)?;
76
77 let author = commit.author().map_err(io::Error::other)?;
78
79 if let Some(filter_author) = &author_filter
80 && author.name != *filter_author
81 {
82 continue;
83 }
84
85 if let Some(since) = since_timestamp
86 && author.time.seconds < since
87 {
88 continue;
89 }
90
91 if let Some(until) = until_timestamp
92 && author.time.seconds > until
93 {
94 continue;
95 }
96
97 hash_builder.append_value(commit_info.id.to_string());
98
99 let message = commit.message_raw().map_err(io::Error::other)?;
100 if trim_message {
101 message_builder.append_value(&String::from_utf8_lossy(message.trim_end()));
102 } else {
103 message_builder.append_value(message.to_string());
104 }
105
106 author_name_builder.append_value(author.name.to_string());
107 author_email_builder.append_value(author.email.to_string());
108 author_timestamp_builder.append_value(author.time.seconds);
109
110 let committer = commit.committer().map_err(io::Error::other)?;
111 committer_name_builder.append_value(committer.name.to_string());
112 committer_email_builder.append_value(committer.email.to_string());
113 committer_timestamp_builder.append_value(committer.time.seconds);
114
115 let parents_builder = parent_hashes_builder.values();
116 for parent_id in commit.parent_ids() {
117 parents_builder.append_value(parent_id.to_string());
118 }
119 parent_hashes_builder.append(true);
120 }
121
122 let batch = RecordBatch::try_new(
123 Arc::new(schema.clone()),
124 vec![
125 Arc::new(hash_builder.finish()) as ArrayRef,
126 Arc::new(message_builder.finish()) as ArrayRef,
127 Arc::new(author_name_builder.finish()) as ArrayRef,
128 Arc::new(author_email_builder.finish()) as ArrayRef,
129 Arc::new(author_timestamp_builder.finish()) as ArrayRef,
130 Arc::new(committer_name_builder.finish()) as ArrayRef,
131 Arc::new(committer_email_builder.finish()) as ArrayRef,
132 Arc::new(committer_timestamp_builder.finish()) as ArrayRef,
133 Arc::new(parent_hashes_builder.finish()) as ArrayRef,
134 ],
135 )
136 .map_err(io::Error::other)?;
137
138 writer.write(&batch).map_err(io::Error::other)?;
139 writer.finish().map_err(io::Error::other)?;
140
141 Ok(())
142}