1use std::collections::VecDeque;
2use std::io::{self, BufRead, Write};
3use std::path::PathBuf;
4use std::sync::{Arc, Condvar, Mutex};
5use std::time::{Duration, Instant};
6
7use rns_core::msgpack::{self, Value};
8use rns_core::types::{DestHash, LinkId};
9use rns_net::{AnnouncedIdentity, Callbacks, PacketHash, RnsNode};
10
11use crate::config::ClientConfig;
12use crate::logging;
13use crate::protocol::{self, RefUpdate};
14use crate::util::{
15 default_reticulum_dir, default_rngit_dir, load_or_create_identity, parse_rns_url,
16};
17use crate::{git, Error, Result};
18
19pub fn main<I>(args: I) -> Result<()>
20where
21 I: IntoIterator<Item = String>,
22{
23 let options = ClientOptions::parse(args)?;
24 let (dest_hash, repository) = parse_rns_url(&options.url)?;
25 let rngit_dir = options.config_dir.unwrap_or_else(default_rngit_dir);
26 let rns_dir = options.rns_config_dir.or_else(default_reticulum_dir);
27 let (config, created) = ClientConfig::load_or_create(rngit_dir, rns_dir)?;
28 logging::init_file_logger(&config.dir.join("client_log"), config.log_level)?;
29 if created {
30 return Err(Error::msg(format!(
31 "created default config at {}; edit it and run again",
32 config.dir.join("client_config").display()
33 )));
34 }
35
36 let helper = RemoteHelper::connect(config, dest_hash)?;
37 helper.run(repository)
38}
39
40struct RemoteHelper {
41 client: SyncClient,
42}
43
44impl RemoteHelper {
45 fn connect(config: ClientConfig, dest_hash: [u8; 16]) -> Result<Self> {
46 let callbacks = SharedCallbacks::default();
47 let state = callbacks.state.clone();
48 let node = RnsNode::from_config(config.reticulum_dir.as_deref(), Box::new(callbacks))?;
49 let client_identity = load_or_create_identity(&config.identity_path)?;
50
51 let dest = DestHash(dest_hash);
52 node.request_path(&dest)
53 .map_err(|_| Error::msg("failed to request destination path"))?;
54 let deadline = Instant::now() + Duration::from_secs(config.connect_timeout_secs);
55 while !node.has_path(&dest).unwrap_or(false) && Instant::now() < deadline {
56 std::thread::sleep(Duration::from_millis(250));
57 }
58
59 let recalled = node
60 .recall_identity(&dest)
61 .map_err(|_| Error::msg("failed to recall destination identity"))?
62 .ok_or_else(|| Error::msg("destination identity is unknown"))?;
63 let sig_pub: [u8; 32] = recalled.public_key[32..64].try_into().unwrap();
64 let link_id = node
65 .create_link(dest_hash, sig_pub)
66 .map_err(|_| Error::msg("failed to create RNS link"))?;
67 let private_key = client_identity
68 .get_private_key()
69 .ok_or_else(|| Error::msg("client identity has no private key"))?;
70 node.identify_on_link(link_id, private_key)
71 .map_err(|_| Error::msg("failed to identify on RNS link"))?;
72
73 wait_for_link(
74 &state,
75 link_id,
76 Duration::from_secs(config.connect_timeout_secs),
77 )?;
78 Ok(Self {
79 client: SyncClient {
80 node,
81 link_id,
82 state,
83 request_timeout: Duration::from_secs(config.request_timeout_secs),
84 },
85 })
86 }
87
88 fn run(self, repository: String) -> Result<()> {
89 let stdin = io::stdin();
90 let stdout = io::stdout();
91 self.run_io(repository, stdin.lock(), stdout.lock())
92 }
93
94 fn run_io<R: BufRead, W: Write>(
95 &self,
96 repository: String,
97 mut input: R,
98 mut output: W,
99 ) -> Result<()> {
100 let mut line = String::new();
101 let mut fetch_refs = Vec::new();
102 let mut push_specs = Vec::new();
103
104 loop {
105 line.clear();
106 if input.read_line(&mut line)? == 0 {
107 break;
108 }
109 let command = line.trim_end();
110 if command.is_empty() {
111 if !fetch_refs.is_empty() {
112 self.fetch(&repository, &fetch_refs)?;
113 fetch_refs.clear();
114 writeln!(output)?;
115 output.flush()?;
116 }
117 if !push_specs.is_empty() {
118 for spec in push_specs.drain(..) {
119 self.push(&repository, &spec)?;
120 writeln!(output, "ok {}", spec.remote)?;
121 }
122 writeln!(output)?;
123 output.flush()?;
124 }
125 continue;
126 }
127
128 if command == "capabilities" {
129 writeln!(output, "option")?;
130 writeln!(output, "list")?;
131 writeln!(output, "fetch")?;
132 writeln!(output, "push")?;
133 writeln!(output)?;
134 } else if command == "list" || command == "list for-push" {
135 let refs = self.list(&repository)?;
136 output.write_all(&refs)?;
137 writeln!(output)?;
138 } else if let Some(rest) = command.strip_prefix("option ") {
139 let _ = rest;
140 writeln!(output, "ok")?;
141 } else if let Some(rest) = command.strip_prefix("fetch ") {
142 fetch_refs.push(parse_fetch_command(rest)?);
143 } else if let Some(rest) = command.strip_prefix("push ") {
144 push_specs.push(parse_push_spec(rest)?);
145 } else {
146 writeln!(output, "error unsupported command")?;
147 }
148 output.flush()?;
149 }
150 Ok(())
151 }
152
153 fn list(&self, repository: &str) -> Result<Vec<u8>> {
154 let response = self.client.request(
155 protocol::PATH_LIST,
156 protocol::repository_request(repository),
157 )?;
158 let bytes = protocol::response_bin(&response.data)?;
159 decode_status(bytes)
160 }
161
162 fn fetch(&self, repository: &str, refs: &[FetchRef]) -> Result<()> {
163 let have = refs.iter().map(|r| r.sha.clone()).collect::<Vec<_>>();
164 let response = self.client.request(
165 protocol::PATH_FETCH,
166 protocol::fetch_request(repository, &have),
167 )?;
168 if let Some(metadata) = response.metadata {
169 ensure_metadata_ok(&metadata)?;
170 let bundle = protocol::response_bin(&response.data)?;
171 git::fetch_bundle_into_local(
172 &bundle,
173 &refs.iter().map(|r| r.name.clone()).collect::<Vec<_>>(),
174 )?;
175 return Ok(());
176 }
177 let bytes = protocol::response_bin(&response.data)?;
178 let bundle = decode_status(bytes)?;
179 if !bundle.is_empty() {
180 git::fetch_bundle_into_local(
181 &bundle,
182 &refs.iter().map(|r| r.name.clone()).collect::<Vec<_>>(),
183 )?;
184 }
185 Ok(())
186 }
187
188 fn push(&self, repository: &str, spec: &PushSpec) -> Result<()> {
189 let mut updates = Vec::new();
190 let mut bundle_refs = Vec::new();
191 if spec.local.is_empty() {
192 updates.push(RefUpdate {
193 refname: spec.remote.clone(),
194 old: None,
195 new: None,
196 force: spec.force,
197 });
198 } else {
199 let sha = git::local_ref_sha(&spec.local)?
200 .ok_or_else(|| Error::msg(format!("unknown local ref {}", spec.local)))?;
201 bundle_refs.push(spec.local.clone());
202 updates.push(RefUpdate {
203 refname: spec.remote.clone(),
204 old: None,
205 new: Some(sha),
206 force: spec.force,
207 });
208 }
209
210 let bundle = git::create_local_bundle(&bundle_refs)?;
211 let response = self.client.request(
212 protocol::PATH_PUSH,
213 protocol::push_request(repository, bundle, updates),
214 )?;
215 let bytes = protocol::response_bin(&response.data)?;
216 let _ = decode_status(bytes)?;
217 Ok(())
218 }
219}
220
221struct SyncClient {
222 node: RnsNode,
223 link_id: [u8; 16],
224 state: Arc<(Mutex<ClientState>, Condvar)>,
225 request_timeout: Duration,
226}
227
228impl SyncClient {
229 fn request(&self, path: &str, data: Vec<u8>) -> Result<Response> {
230 {
231 let (lock, _) = &*self.state;
232 lock.lock().unwrap().responses.clear();
233 }
234 self.node
235 .send_request(self.link_id, path, &data)
236 .map_err(|_| Error::msg("failed to send request"))?;
237 let deadline = Instant::now() + self.request_timeout;
238 let (lock, cv) = &*self.state;
239 let mut state = lock.lock().unwrap();
240 loop {
241 if let Some(response) = state.responses.pop_front() {
242 return Ok(response);
243 }
244 let now = Instant::now();
245 if now >= deadline {
246 return Err(Error::msg("request timed out"));
247 }
248 let wait = deadline.saturating_duration_since(now);
249 let (next, _) = cv.wait_timeout(state, wait).unwrap();
250 state = next;
251 }
252 }
253}
254
255#[derive(Default)]
256struct SharedCallbacks {
257 state: Arc<(Mutex<ClientState>, Condvar)>,
258}
259
260#[derive(Default)]
261struct ClientState {
262 established: Vec<[u8; 16]>,
263 responses: VecDeque<Response>,
264}
265
266#[derive(Debug, Clone)]
267struct Response {
268 data: Vec<u8>,
269 metadata: Option<Vec<u8>>,
270}
271
272impl Callbacks for SharedCallbacks {
273 fn on_announce(&mut self, _announced: AnnouncedIdentity) {}
274
275 fn on_path_updated(&mut self, _dest_hash: DestHash, _hops: u8) {}
276
277 fn on_local_delivery(&mut self, _dest_hash: DestHash, _raw: Vec<u8>, _packet_hash: PacketHash) {
278 }
279
280 fn on_link_established(
281 &mut self,
282 link_id: LinkId,
283 _dest_hash: DestHash,
284 _rtt: f64,
285 _is_initiator: bool,
286 ) {
287 let (lock, cv) = &*self.state;
288 lock.lock().unwrap().established.push(link_id.0);
289 cv.notify_all();
290 }
291
292 fn on_response_with_metadata(
293 &mut self,
294 _link_id: LinkId,
295 _request_id: [u8; 16],
296 data: Vec<u8>,
297 metadata: Option<Vec<u8>>,
298 ) {
299 let (lock, cv) = &*self.state;
300 lock.lock()
301 .unwrap()
302 .responses
303 .push_back(Response { data, metadata });
304 cv.notify_all();
305 }
306}
307
308fn wait_for_link(
309 state: &Arc<(Mutex<ClientState>, Condvar)>,
310 link_id: [u8; 16],
311 timeout: Duration,
312) -> Result<()> {
313 let deadline = Instant::now() + timeout;
314 let (lock, cv) = &**state;
315 let mut state = lock.lock().unwrap();
316 loop {
317 if state.established.contains(&link_id) {
318 return Ok(());
319 }
320 let now = Instant::now();
321 if now >= deadline {
322 return Err(Error::msg("link establishment timed out"));
323 }
324 let (next, _) = cv
325 .wait_timeout(state, deadline.saturating_duration_since(now))
326 .unwrap();
327 state = next;
328 }
329}
330
331fn decode_status(bytes: Vec<u8>) -> Result<Vec<u8>> {
332 let Some((&code, body)) = bytes.split_first() else {
333 return Err(Error::msg("empty response"));
334 };
335 if code == protocol::RES_OK {
336 Ok(body.to_vec())
337 } else {
338 Err(Error::msg(format!(
339 "remote returned status 0x{code:02x}: {}",
340 String::from_utf8_lossy(body)
341 )))
342 }
343}
344
345fn ensure_metadata_ok(metadata: &[u8]) -> Result<()> {
346 let value = msgpack::unpack_exact(metadata)
347 .map_err(|e| Error::msg(format!("invalid response metadata: {e}")))?;
348 let Some(map) = value.as_map() else {
349 return Err(Error::msg("response metadata is not a map"));
350 };
351 let code = map.iter().find_map(|(key, value)| {
352 if matches!(key, Value::UInt(v) if *v == protocol::IDX_RESULT_CODE) {
353 value.as_uint().map(|v| v as u8)
354 } else {
355 None
356 }
357 });
358 match code {
359 Some(protocol::RES_OK) => Ok(()),
360 Some(code) => Err(Error::msg(format!("remote returned status 0x{code:02x}"))),
361 None => Err(Error::msg("response metadata missing status code")),
362 }
363}
364
365#[derive(Debug, Clone, PartialEq, Eq)]
366struct FetchRef {
367 sha: String,
368 name: String,
369}
370
371fn parse_fetch_command(input: &str) -> Result<FetchRef> {
372 let mut parts = input.split_whitespace();
373 let sha = parts
374 .next()
375 .ok_or_else(|| Error::msg("fetch command missing sha"))?;
376 let name = parts
377 .next()
378 .ok_or_else(|| Error::msg("fetch command missing ref"))?;
379 Ok(FetchRef {
380 sha: sha.to_string(),
381 name: name.to_string(),
382 })
383}
384
385#[derive(Debug, Clone, PartialEq, Eq)]
386struct PushSpec {
387 local: String,
388 remote: String,
389 force: bool,
390}
391
392fn parse_push_spec(input: &str) -> Result<PushSpec> {
393 let (force, spec) = input
394 .strip_prefix('+')
395 .map(|s| (true, s))
396 .unwrap_or((false, input));
397 let (local, remote) = spec
398 .split_once(':')
399 .ok_or_else(|| Error::msg("push spec must be <local>:<remote>"))?;
400 Ok(PushSpec {
401 local: local.to_string(),
402 remote: remote.to_string(),
403 force,
404 })
405}
406
407#[derive(Debug, Default)]
408struct ClientOptions {
409 config_dir: Option<PathBuf>,
410 rns_config_dir: Option<PathBuf>,
411 url: String,
412}
413
414impl ClientOptions {
415 fn parse<I>(args: I) -> Result<Self>
416 where
417 I: IntoIterator<Item = String>,
418 {
419 let mut options = ClientOptions::default();
420 let mut positional = Vec::new();
421 let mut args = args.into_iter();
422 while let Some(arg) = args.next() {
423 match arg.as_str() {
424 "-c" | "--config" => {
425 options.config_dir = Some(PathBuf::from(
426 args.next()
427 .ok_or_else(|| Error::msg("missing config path"))?,
428 ));
429 }
430 "--rnsconfig" => {
431 options.rns_config_dir = Some(PathBuf::from(
432 args.next()
433 .ok_or_else(|| Error::msg("missing RNS config path"))?,
434 ));
435 }
436 "-h" | "--help" => return Err(Error::msg(usage())),
437 other => positional.push(other.to_string()),
438 }
439 }
440 options.url = positional
441 .last()
442 .cloned()
443 .ok_or_else(|| Error::msg(usage()))?;
444 Ok(options)
445 }
446}
447
448fn usage() -> &'static str {
449 "usage: git-remote-rns [--config DIR] [--rnsconfig DIR] <remote-name> <rns://destination/repo>"
450}
451
452#[cfg(test)]
453mod tests {
454 use super::*;
455
456 #[test]
457 fn parses_fetch_command() {
458 assert_eq!(
459 parse_fetch_command("abc refs/heads/main").unwrap(),
460 FetchRef {
461 sha: "abc".into(),
462 name: "refs/heads/main".into()
463 }
464 );
465 }
466
467 #[test]
468 fn parses_forced_push_spec() {
469 assert_eq!(
470 parse_push_spec("+refs/heads/main:refs/heads/main").unwrap(),
471 PushSpec {
472 local: "refs/heads/main".into(),
473 remote: "refs/heads/main".into(),
474 force: true
475 }
476 );
477 }
478
479 #[test]
480 fn decodes_status_payload() {
481 assert_eq!(
482 decode_status(protocol::status_bytes(protocol::RES_OK, b"refs")).unwrap(),
483 b"refs"
484 );
485 assert!(decode_status(protocol::status_bytes(protocol::RES_NOT_FOUND, b"no")).is_err());
486 }
487
488 #[test]
489 fn metadata_status_ok_is_accepted() {
490 assert!(ensure_metadata_ok(&protocol::metadata_status(protocol::RES_OK)).is_ok());
491 assert!(ensure_metadata_ok(&protocol::metadata_status(protocol::RES_REMOTE_FAIL)).is_err());
492 }
493}