rs_git_status2arrow_ipc_stream/
lib.rs1use arrow::array::{
2 ArrayRef, DictionaryArray, StringBuilder, TimestampSecondBuilder, UInt64Builder,
3};
4use arrow::datatypes::{DataType, Field, Int32Type, Schema, TimeUnit};
5use arrow::ipc::writer::StreamWriter;
6use arrow::record_batch::RecordBatch;
7use gix::bstr::ByteSlice;
8use std::io;
9use std::path::Path;
10use std::sync::Arc;
11
12use io::Write;
13
14use gix::Progress;
15use gix::Repository;
16
17use gix::status::Item as GixStatusItem;
18use gix::status::Platform;
19use gix::status::index_worktree::Item as GixStatusWorkTreeItem;
20use gix::status::index_worktree::iter::Summary as GixSummary;
21
22use gix::diff::index::Change as GixChange;
23
24use serde::Serialize;
25
26#[derive(Debug, Serialize)]
27pub enum StatusDto {
28 Removed,
29 Added,
30 Modified,
31 TypeChange,
32 Renamed,
33 Copied,
34 IntentToAdd,
35 Conflict,
36 Untracked,
37}
38
39#[derive(Debug, Serialize)]
40#[serde(untagged)]
41pub enum StatusItemDto {
42 IndexWorktree { path: String, status: StatusDto },
43 TreeIndex { path: String, status: StatusDto },
44}
45
46impl From<&GixStatusItem> for StatusItemDto {
47 fn from(item: &GixStatusItem) -> Self {
48 match item {
49 GixStatusItem::IndexWorktree(iw_item) => {
50 let status = match iw_item.summary() {
51 Some(GixSummary::Removed) => StatusDto::Removed,
52 Some(GixSummary::Added) => StatusDto::Added,
53 Some(GixSummary::Modified) => StatusDto::Modified,
54 Some(GixSummary::TypeChange) => StatusDto::TypeChange,
55 Some(GixSummary::Renamed) => StatusDto::Renamed,
56 Some(GixSummary::Copied) => StatusDto::Copied,
57 Some(GixSummary::IntentToAdd) => StatusDto::IntentToAdd,
58 Some(GixSummary::Conflict) => StatusDto::Conflict,
59 None => StatusDto::Untracked,
60 };
61 StatusItemDto::IndexWorktree {
62 path: iw_item.rela_path().to_string(),
63 status,
64 }
65 }
66 GixStatusItem::TreeIndex(ti_change) => {
67 let (path, status) = match ti_change {
68 GixChange::Addition { location, .. } => {
69 (location.to_string(), StatusDto::Added)
70 }
71 GixChange::Deletion { location, .. } => {
72 (location.to_string(), StatusDto::Removed)
73 }
74 GixChange::Modification { location, .. } => {
75 (location.to_string(), StatusDto::Modified)
76 }
77 GixChange::Rewrite { location, .. } => {
78 (location.to_string(), StatusDto::Renamed)
79 }
80 };
81 StatusItemDto::TreeIndex { path, status }
82 }
83 }
84 }
85}
86
87pub struct GitDir<P>(pub P);
88
89impl<P> GitDir<P>
90where
91 P: AsRef<Path>,
92{
93 pub fn discover(&self) -> Result<Repository, io::Error> {
94 gix::discover(self.0.as_ref()).map_err(io::Error::other)
95 }
96}
97
98pub struct GitRepo(pub Repository);
99
100impl GitRepo {
101 pub fn status<P>(&self, progress: P) -> Result<Platform<'_, P>, io::Error>
102 where
103 P: Progress,
104 {
105 self.0.status(progress).map_err(io::Error::other)
106 }
107}
108
109pub struct GitStatus<'a, P>(pub Platform<'a, P>)
110where
111 P: Progress + 'static;
112
113impl<'a, P> GitStatus<'a, P>
114where
115 P: Progress + 'static,
116{
117 pub fn iter(self) -> Result<impl Iterator<Item = Result<GixStatusItem, io::Error>>, io::Error> {
118 self.0
119 .into_iter(vec![])
120 .map_err(io::Error::other)
121 .map(|i| i.map(|r| r.map_err(io::Error::other)))
122 }
123}
124
125pub struct GitStatusItemWorktree(pub GixStatusWorkTreeItem);
126
127pub struct GitStatusIndexChange(pub GixChange);
128
129pub fn status2json2writer<W>(status: &GixStatusItem, wtr: &mut W) -> Result<(), io::Error>
130where
131 W: Write,
132{
133 let dto = StatusItemDto::from(status);
134 serde_json::to_writer(&mut *wtr, &dto)?;
135 writeln!(wtr)?; Ok(())
137}
138
139pub fn get_arrow_schema() -> Schema {
140 Schema::new(vec![
141 Field::new("path", DataType::Utf8, false),
142 Field::new(
143 "status",
144 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
145 false,
146 ),
147 Field::new(
148 "item_type",
149 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
150 false,
151 ),
152 Field::new("extension", DataType::Utf8, true),
153 Field::new("size", DataType::UInt64, true),
154 Field::new(
155 "last_modification_time",
156 DataType::Timestamp(TimeUnit::Second, None),
157 true,
158 ),
159 ])
160}
161
162pub fn status2arrow_ipc_stream_writer<W>(
163 items: &[GixStatusItem],
164 wtr: &mut W,
165) -> Result<(), io::Error>
166where
167 W: Write,
168{
169 let schema = get_arrow_schema();
170 let mut path_builder = StringBuilder::new();
171 let mut extension_builder = StringBuilder::new();
172 let mut size_builder = UInt64Builder::new();
173 let mut mtime_builder = TimestampSecondBuilder::new();
174
175 for item in items {
176 match item {
177 GixStatusItem::IndexWorktree(iw_item) => {
178 let path = iw_item.rela_path();
179 path_builder.append_value(path.to_string());
180 let extension = path
181 .to_path()
182 .ok()
183 .and_then(|p| p.extension())
184 .and_then(|s| s.to_str())
185 .unwrap_or("");
186 extension_builder.append_value(extension);
187 if let Ok(metadata) = std::fs::metadata(path.to_string()) {
188 size_builder.append_value(metadata.len());
189 if let Ok(mtime) = metadata.modified() {
190 if let Ok(duration) = mtime.duration_since(std::time::UNIX_EPOCH) {
191 mtime_builder.append_value(duration.as_secs() as i64);
192 } else {
193 mtime_builder.append_null();
194 }
195 } else {
196 mtime_builder.append_null();
197 }
198 } else {
199 size_builder.append_null();
200 mtime_builder.append_null();
201 }
202 }
203 GixStatusItem::TreeIndex(ti_change) => {
204 let path = match ti_change {
205 GixChange::Addition { location, .. } => location,
206 GixChange::Deletion { location, .. } => location,
207 GixChange::Modification { location, .. } => location,
208 GixChange::Rewrite { location, .. } => location,
209 };
210 path_builder.append_value(path.to_string());
211 let extension = path
212 .to_path()
213 .ok()
214 .and_then(|p| p.extension())
215 .and_then(|s| s.to_str())
216 .unwrap_or("");
217 extension_builder.append_value(extension);
218 size_builder.append_null();
219 mtime_builder.append_null();
220 }
221 }
222 }
223 let path_array = Arc::new(path_builder.finish()) as ArrayRef;
224 let extension_array = Arc::new(extension_builder.finish()) as ArrayRef;
225 let size_array = Arc::new(size_builder.finish()) as ArrayRef;
226 let mtime_array = Arc::new(mtime_builder.finish()) as ArrayRef;
227
228 let status_values: Vec<_> = items
229 .iter()
230 .map(|item| {
231 let dto = StatusItemDto::from(item);
232 match dto {
233 StatusItemDto::IndexWorktree { status, .. } => format!("{:?}", status),
234 StatusItemDto::TreeIndex { status, .. } => format!("{:?}", status),
235 }
236 })
237 .collect();
238 let status_array = Arc::new(DictionaryArray::<Int32Type>::from_iter(
239 status_values.iter().map(|s| s.as_str()),
240 )) as ArrayRef;
241
242 let item_type_values: Vec<_> = items
243 .iter()
244 .map(|item| match item {
245 GixStatusItem::IndexWorktree(_) => "IndexWorktree",
246 GixStatusItem::TreeIndex(_) => "TreeIndex",
247 })
248 .collect();
249 let item_type_array = Arc::new(DictionaryArray::<Int32Type>::from_iter(
250 item_type_values.iter().copied(),
251 )) as ArrayRef;
252
253 let batch = RecordBatch::try_new(
254 Arc::new(schema.clone()),
255 vec![
256 path_array,
257 status_array,
258 item_type_array,
259 extension_array,
260 size_array,
261 mtime_array,
262 ],
263 )
264 .map_err(io::Error::other)?;
265
266 let mut writer = StreamWriter::try_new(wtr, &schema).map_err(io::Error::other)?;
267 writer.write(&batch).map_err(io::Error::other)?;
268 writer.finish().map_err(io::Error::other)?;
269
270 Ok(())
271}