rs_git_status2arrow_ipc_stream/
lib.rs

1use arrow::array::{
2    ArrayRef, DictionaryArray, StringBuilder, TimestampSecondBuilder, UInt64Builder,
3};
4use arrow::datatypes::{DataType, Field, Int32Type, Schema, TimeUnit};
5use arrow::ipc::writer::StreamWriter;
6use arrow::record_batch::RecordBatch;
7use gix::bstr::ByteSlice;
8use std::io;
9use std::path::Path;
10use std::sync::Arc;
11
12use io::Write;
13
14use gix::Progress;
15use gix::Repository;
16
17use gix::status::Item as GixStatusItem;
18use gix::status::Platform;
19use gix::status::index_worktree::Item as GixStatusWorkTreeItem;
20use gix::status::index_worktree::iter::Summary as GixSummary;
21
22use gix::diff::index::Change as GixChange;
23
24use serde::Serialize;
25
26#[derive(Debug, Serialize)]
27pub enum StatusDto {
28    Removed,
29    Added,
30    Modified,
31    TypeChange,
32    Renamed,
33    Copied,
34    IntentToAdd,
35    Conflict,
36    Untracked,
37}
38
39#[derive(Debug, Serialize)]
40#[serde(untagged)]
41pub enum StatusItemDto {
42    IndexWorktree { path: String, status: StatusDto },
43    TreeIndex { path: String, status: StatusDto },
44}
45
46impl From<&GixStatusItem> for StatusItemDto {
47    fn from(item: &GixStatusItem) -> Self {
48        match item {
49            GixStatusItem::IndexWorktree(iw_item) => {
50                let status = match iw_item.summary() {
51                    Some(GixSummary::Removed) => StatusDto::Removed,
52                    Some(GixSummary::Added) => StatusDto::Added,
53                    Some(GixSummary::Modified) => StatusDto::Modified,
54                    Some(GixSummary::TypeChange) => StatusDto::TypeChange,
55                    Some(GixSummary::Renamed) => StatusDto::Renamed,
56                    Some(GixSummary::Copied) => StatusDto::Copied,
57                    Some(GixSummary::IntentToAdd) => StatusDto::IntentToAdd,
58                    Some(GixSummary::Conflict) => StatusDto::Conflict,
59                    None => StatusDto::Untracked,
60                };
61                StatusItemDto::IndexWorktree {
62                    path: iw_item.rela_path().to_string(),
63                    status,
64                }
65            }
66            GixStatusItem::TreeIndex(ti_change) => {
67                let (path, status) = match ti_change {
68                    GixChange::Addition { location, .. } => {
69                        (location.to_string(), StatusDto::Added)
70                    }
71                    GixChange::Deletion { location, .. } => {
72                        (location.to_string(), StatusDto::Removed)
73                    }
74                    GixChange::Modification { location, .. } => {
75                        (location.to_string(), StatusDto::Modified)
76                    }
77                    GixChange::Rewrite { location, .. } => {
78                        (location.to_string(), StatusDto::Renamed)
79                    }
80                };
81                StatusItemDto::TreeIndex { path, status }
82            }
83        }
84    }
85}
86
87pub struct GitDir<P>(pub P);
88
89impl<P> GitDir<P>
90where
91    P: AsRef<Path>,
92{
93    pub fn discover(&self) -> Result<Repository, io::Error> {
94        gix::discover(self.0.as_ref()).map_err(io::Error::other)
95    }
96}
97
98pub struct GitRepo(pub Repository);
99
100impl GitRepo {
101    pub fn status<P>(&self, progress: P) -> Result<Platform<'_, P>, io::Error>
102    where
103        P: Progress,
104    {
105        self.0.status(progress).map_err(io::Error::other)
106    }
107}
108
109pub struct GitStatus<'a, P>(pub Platform<'a, P>)
110where
111    P: Progress + 'static;
112
113impl<'a, P> GitStatus<'a, P>
114where
115    P: Progress + 'static,
116{
117    pub fn iter(self) -> Result<impl Iterator<Item = Result<GixStatusItem, io::Error>>, io::Error> {
118        self.0
119            .into_iter(vec![])
120            .map_err(io::Error::other)
121            .map(|i| i.map(|r| r.map_err(io::Error::other)))
122    }
123}
124
125pub struct GitStatusItemWorktree(pub GixStatusWorkTreeItem);
126
127pub struct GitStatusIndexChange(pub GixChange);
128
129pub fn status2json2writer<W>(status: &GixStatusItem, wtr: &mut W) -> Result<(), io::Error>
130where
131    W: Write,
132{
133    let dto = StatusItemDto::from(status);
134    serde_json::to_writer(&mut *wtr, &dto)?;
135    writeln!(wtr)?; // Add a newline for pretty printing each JSON object
136    Ok(())
137}
138
139pub fn get_arrow_schema() -> Schema {
140    Schema::new(vec![
141        Field::new("path", DataType::Utf8, false),
142        Field::new(
143            "status",
144            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
145            false,
146        ),
147        Field::new(
148            "item_type",
149            DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
150            false,
151        ),
152        Field::new("extension", DataType::Utf8, true),
153        Field::new("size", DataType::UInt64, true),
154        Field::new(
155            "last_modification_time",
156            DataType::Timestamp(TimeUnit::Second, None),
157            true,
158        ),
159    ])
160}
161
162pub fn status2arrow_ipc_stream_writer<W>(
163    items: &[GixStatusItem],
164    wtr: &mut W,
165) -> Result<(), io::Error>
166where
167    W: Write,
168{
169    let schema = get_arrow_schema();
170    let mut path_builder = StringBuilder::new();
171    let mut extension_builder = StringBuilder::new();
172    let mut size_builder = UInt64Builder::new();
173    let mut mtime_builder = TimestampSecondBuilder::new();
174
175    for item in items {
176        match item {
177            GixStatusItem::IndexWorktree(iw_item) => {
178                let path = iw_item.rela_path();
179                path_builder.append_value(path.to_string());
180                let extension = path
181                    .to_path()
182                    .ok()
183                    .and_then(|p| p.extension())
184                    .and_then(|s| s.to_str())
185                    .unwrap_or("");
186                extension_builder.append_value(extension);
187                if let Ok(metadata) = std::fs::metadata(path.to_string()) {
188                    size_builder.append_value(metadata.len());
189                    if let Ok(mtime) = metadata.modified() {
190                        if let Ok(duration) = mtime.duration_since(std::time::UNIX_EPOCH) {
191                            mtime_builder.append_value(duration.as_secs() as i64);
192                        } else {
193                            mtime_builder.append_null();
194                        }
195                    } else {
196                        mtime_builder.append_null();
197                    }
198                } else {
199                    size_builder.append_null();
200                    mtime_builder.append_null();
201                }
202            }
203            GixStatusItem::TreeIndex(ti_change) => {
204                let path = match ti_change {
205                    GixChange::Addition { location, .. } => location,
206                    GixChange::Deletion { location, .. } => location,
207                    GixChange::Modification { location, .. } => location,
208                    GixChange::Rewrite { location, .. } => location,
209                };
210                path_builder.append_value(path.to_string());
211                let extension = path
212                    .to_path()
213                    .ok()
214                    .and_then(|p| p.extension())
215                    .and_then(|s| s.to_str())
216                    .unwrap_or("");
217                extension_builder.append_value(extension);
218                size_builder.append_null();
219                mtime_builder.append_null();
220            }
221        }
222    }
223    let path_array = Arc::new(path_builder.finish()) as ArrayRef;
224    let extension_array = Arc::new(extension_builder.finish()) as ArrayRef;
225    let size_array = Arc::new(size_builder.finish()) as ArrayRef;
226    let mtime_array = Arc::new(mtime_builder.finish()) as ArrayRef;
227
228    let status_values: Vec<_> = items
229        .iter()
230        .map(|item| {
231            let dto = StatusItemDto::from(item);
232            match dto {
233                StatusItemDto::IndexWorktree { status, .. } => format!("{:?}", status),
234                StatusItemDto::TreeIndex { status, .. } => format!("{:?}", status),
235            }
236        })
237        .collect();
238    let status_array = Arc::new(DictionaryArray::<Int32Type>::from_iter(
239        status_values.iter().map(|s| s.as_str()),
240    )) as ArrayRef;
241
242    let item_type_values: Vec<_> = items
243        .iter()
244        .map(|item| match item {
245            GixStatusItem::IndexWorktree(_) => "IndexWorktree",
246            GixStatusItem::TreeIndex(_) => "TreeIndex",
247        })
248        .collect();
249    let item_type_array = Arc::new(DictionaryArray::<Int32Type>::from_iter(
250        item_type_values.iter().copied(),
251    )) as ArrayRef;
252
253    let batch = RecordBatch::try_new(
254        Arc::new(schema.clone()),
255        vec![
256            path_array,
257            status_array,
258            item_type_array,
259            extension_array,
260            size_array,
261            mtime_array,
262        ],
263    )
264    .map_err(io::Error::other)?;
265
266    let mut writer = StreamWriter::try_new(wtr, &schema).map_err(io::Error::other)?;
267    writer.write(&batch).map_err(io::Error::other)?;
268    writer.finish().map_err(io::Error::other)?;
269
270    Ok(())
271}