1use git2::{FetchOptions, Oid, PushOptions, RemoteCallbacks, Repository};
7use libgrite_core::types::event::Event;
8use libgrite_core::types::ids::ActorId;
9use std::cell::RefCell;
10use std::path::Path;
11use std::rc::Rc;
12
13use crate::wal::WalManager;
14use crate::GitError;
15
16pub const GRITE_REFSPEC: &str = "refs/grite/*:refs/grite/*";
18
19#[derive(Debug)]
21pub struct PullResult {
22 pub success: bool,
24 pub new_wal_head: Option<Oid>,
26 pub events_pulled: usize,
28 pub message: String,
30}
31
32#[derive(Debug)]
34pub struct PushResult {
35 pub success: bool,
37 pub rebased: bool,
39 pub events_rebased: usize,
41 pub message: String,
43}
44
45pub struct SyncManager {
47 repo: Repository,
48 git_dir: std::path::PathBuf,
49}
50
51impl SyncManager {
52 pub fn open(git_dir: &Path) -> Result<Self, GitError> {
54 let repo_path = git_dir.parent().ok_or(GitError::NotARepo)?;
55 let repo = Repository::open(repo_path)?;
56 Ok(Self {
57 repo,
58 git_dir: git_dir.to_path_buf(),
59 })
60 }
61
62 pub fn pull(&self, remote_name: &str) -> Result<PullResult, GitError> {
64 let wal = WalManager::open(&self.git_dir)?;
65 let old_head = wal.head()?;
66
67 let mut remote = self.repo.find_remote(remote_name)?;
69 let refspecs = [GRITE_REFSPEC];
70
71 let config = self.repo.config()?;
72 let mut callbacks = RemoteCallbacks::new();
73 callbacks.credentials(move |url, username_from_url, allowed_types| {
74 if allowed_types.contains(git2::CredentialType::SSH_KEY) {
75 return git2::Cred::ssh_key_from_agent(username_from_url.unwrap_or("git"));
76 }
77 if allowed_types.contains(git2::CredentialType::USER_PASS_PLAINTEXT) {
78 if let Ok(cred) = git2::Cred::credential_helper(&config, url, username_from_url) {
79 return Ok(cred);
80 }
81 }
82 if allowed_types.contains(git2::CredentialType::USERNAME) {
83 return git2::Cred::username(username_from_url.unwrap_or("git"));
84 }
85 Err(git2::Error::from_str("no supported authentication method"))
86 });
87 callbacks.transfer_progress(|_stats| true);
88
89 let mut fetch_options = FetchOptions::new();
90 fetch_options.remote_callbacks(callbacks);
91
92 remote.fetch(&refspecs, Some(&mut fetch_options), None)?;
93
94 let new_head = wal.head()?;
96 let events_pulled = if new_head != old_head {
97 if let Some(_new_oid) = new_head {
98 if let Some(old_oid) = old_head {
99 wal.read_since(old_oid)?.len()
100 } else {
101 wal.read_all()?.len()
102 }
103 } else {
104 0
105 }
106 } else {
107 0
108 };
109
110 Ok(PullResult {
111 success: true,
112 new_wal_head: new_head,
113 events_pulled,
114 message: if events_pulled > 0 {
115 format!("Pulled {} new events", events_pulled)
116 } else {
117 "Already up to date".to_string()
118 },
119 })
120 }
121
122 pub fn push(&self, remote_name: &str) -> Result<PushResult, GitError> {
124 let refspecs: Vec<String> = self
126 .repo
127 .references()?
128 .filter_map(Result::ok)
129 .filter_map(|r| r.name().map(|n| n.to_string()))
130 .filter(|n| n.starts_with("refs/grite/"))
131 .map(|n| format!("{}:{}", n, n))
132 .collect();
133
134 if refspecs.is_empty() {
135 return Ok(PushResult {
136 success: true,
137 rebased: false,
138 events_rebased: 0,
139 message: "Nothing to push (no grite refs)".to_string(),
140 });
141 }
142
143 let mut remote = self.repo.find_remote(remote_name)?;
144 let refspec_strs: Vec<&str> = refspecs.iter().map(|s| s.as_str()).collect();
145
146 let push_error: Rc<RefCell<Option<String>>> = Rc::new(RefCell::new(None));
147 let push_error_clone = Rc::clone(&push_error);
148
149 let config = self.repo.config()?;
150 let mut callbacks = RemoteCallbacks::new();
151 callbacks.credentials(move |url, username_from_url, allowed_types| {
152 if allowed_types.contains(git2::CredentialType::SSH_KEY) {
153 return git2::Cred::ssh_key_from_agent(username_from_url.unwrap_or("git"));
154 }
155 if allowed_types.contains(git2::CredentialType::USER_PASS_PLAINTEXT) {
156 if let Ok(cred) = git2::Cred::credential_helper(&config, url, username_from_url) {
157 return Ok(cred);
158 }
159 }
160 if allowed_types.contains(git2::CredentialType::USERNAME) {
161 return git2::Cred::username(username_from_url.unwrap_or("git"));
162 }
163 Err(git2::Error::from_str("no supported authentication method"))
164 });
165 callbacks.push_update_reference(move |refname, status| {
166 if let Some(msg) = status {
167 *push_error_clone.borrow_mut() = Some(format!("{}: {}", refname, msg));
168 }
169 Ok(())
170 });
171
172 let mut push_options = PushOptions::new();
173 push_options.remote_callbacks(callbacks);
174
175 remote.push(&refspec_strs, Some(&mut push_options))?;
176
177 let error = push_error.borrow().clone();
179 if let Some(error_msg) = error {
180 return Ok(PushResult {
182 success: false,
183 rebased: false,
184 events_rebased: 0,
185 message: format!("Push rejected: {}", error_msg),
186 });
187 }
188
189 Ok(PushResult {
190 success: true,
191 rebased: false,
192 events_rebased: 0,
193 message: "Push successful".to_string(),
194 })
195 }
196
197 pub fn push_with_rebase(
206 &self,
207 remote_name: &str,
208 actor_id: &ActorId,
209 ) -> Result<PushResult, GitError> {
210 let wal = WalManager::open(&self.git_dir)?;
211
212 let local_head = wal.head()?;
214
215 let result = self.push(remote_name)?;
217 if result.success {
218 return Ok(result);
219 }
220
221 let local_events = if let Some(head_oid) = local_head {
224 wal.read_from_oid(head_oid)?
225 } else {
226 vec![]
227 };
228
229 self.pull(remote_name)?;
231
232 let remote_head = wal.head()?;
234 let remote_events = if let Some(head_oid) = remote_head {
235 wal.read_from_oid(head_oid)?
236 } else {
237 vec![]
238 };
239
240 let remote_event_ids: std::collections::HashSet<_> =
242 remote_events.iter().map(|e| e.event_id).collect();
243 let unique_local_events: Vec<Event> = local_events
244 .into_iter()
245 .filter(|e| !remote_event_ids.contains(&e.event_id))
246 .collect();
247
248 let events_rebased = unique_local_events.len();
250 if !unique_local_events.is_empty() {
251 wal.append(actor_id, &unique_local_events)?;
252 }
253
254 let retry_result = self.push(remote_name)?;
256
257 Ok(PushResult {
258 success: retry_result.success,
259 rebased: true,
260 events_rebased,
261 message: if retry_result.success {
262 format!(
263 "Push successful after rebase ({} events rebased)",
264 events_rebased
265 )
266 } else {
267 retry_result.message
268 },
269 })
270 }
271
272 pub fn sync(&self, remote_name: &str) -> Result<(PullResult, PushResult), GitError> {
274 let pull_result = self.pull(remote_name)?;
275 let push_result = self.push(remote_name)?;
276 Ok((pull_result, push_result))
277 }
278
279 pub fn sync_with_rebase(
281 &self,
282 remote_name: &str,
283 actor_id: &ActorId,
284 ) -> Result<(PullResult, PushResult), GitError> {
285 let pull_result = self.pull(remote_name)?;
286 let push_result = self.push_with_rebase(remote_name, actor_id)?;
287 Ok((pull_result, push_result))
288 }
289}
290
291#[cfg(test)]
292mod tests {
293 #[test]
297 fn test_sync_manager_opens() {
298 use std::process::Command;
299 use tempfile::TempDir;
300
301 let temp = TempDir::new().unwrap();
302 Command::new("git")
303 .args(["init"])
304 .current_dir(temp.path())
305 .output()
306 .unwrap();
307
308 let git_dir = temp.path().join(".git");
309 let mgr = super::SyncManager::open(&git_dir);
310 assert!(mgr.is_ok());
311 }
312}