Skip to main content

rns_git/
client.rs

1use std::collections::VecDeque;
2use std::io::{self, BufRead, Write};
3use std::path::PathBuf;
4use std::sync::{Arc, Condvar, Mutex};
5use std::time::{Duration, Instant};
6
7use rns_core::msgpack::{self, Value};
8use rns_core::types::{DestHash, LinkId};
9use rns_net::{AnnouncedIdentity, Callbacks, PacketHash, RnsNode};
10
11use crate::config::ClientConfig;
12use crate::logging;
13use crate::protocol::{self, RefUpdate};
14use crate::util::{
15    default_reticulum_dir, default_rngit_dir, load_or_create_identity, parse_rns_url,
16};
17use crate::{git, Error, Result};
18
19pub fn main<I>(args: I) -> Result<()>
20where
21    I: IntoIterator<Item = String>,
22{
23    let options = ClientOptions::parse(args)?;
24    let (dest_hash, repository) = parse_rns_url(&options.url)?;
25    let rngit_dir = options.config_dir.unwrap_or_else(default_rngit_dir);
26    let rns_dir = options.rns_config_dir.or_else(default_reticulum_dir);
27    let (config, created) = ClientConfig::load_or_create(rngit_dir, rns_dir)?;
28    logging::init_file_logger(&config.dir.join("client_log"), config.log_level)?;
29    if created {
30        return Err(Error::msg(format!(
31            "created default config at {}; edit it and run again",
32            config.dir.join("client_config").display()
33        )));
34    }
35
36    let helper = RemoteHelper::connect(config, dest_hash)?;
37    helper.run(repository)
38}
39
40struct RemoteHelper {
41    client: SyncClient,
42}
43
44impl RemoteHelper {
45    fn connect(config: ClientConfig, dest_hash: [u8; 16]) -> Result<Self> {
46        let callbacks = SharedCallbacks::default();
47        let state = callbacks.state.clone();
48        let node = RnsNode::from_config(config.reticulum_dir.as_deref(), Box::new(callbacks))?;
49        let client_identity = load_or_create_identity(&config.identity_path)?;
50
51        let dest = DestHash(dest_hash);
52        node.request_path(&dest)
53            .map_err(|_| Error::msg("failed to request destination path"))?;
54        let deadline = Instant::now() + Duration::from_secs(config.connect_timeout_secs);
55        while !node.has_path(&dest).unwrap_or(false) && Instant::now() < deadline {
56            std::thread::sleep(Duration::from_millis(250));
57        }
58
59        let recalled = node
60            .recall_identity(&dest)
61            .map_err(|_| Error::msg("failed to recall destination identity"))?
62            .ok_or_else(|| Error::msg("destination identity is unknown"))?;
63        let sig_pub: [u8; 32] = recalled.public_key[32..64].try_into().unwrap();
64        let link_id = node
65            .create_link(dest_hash, sig_pub)
66            .map_err(|_| Error::msg("failed to create RNS link"))?;
67        let private_key = client_identity
68            .get_private_key()
69            .ok_or_else(|| Error::msg("client identity has no private key"))?;
70        node.identify_on_link(link_id, private_key)
71            .map_err(|_| Error::msg("failed to identify on RNS link"))?;
72
73        wait_for_link(
74            &state,
75            link_id,
76            Duration::from_secs(config.connect_timeout_secs),
77        )?;
78        Ok(Self {
79            client: SyncClient {
80                node,
81                link_id,
82                state,
83                request_timeout: Duration::from_secs(config.request_timeout_secs),
84            },
85        })
86    }
87
88    fn run(self, repository: String) -> Result<()> {
89        let stdin = io::stdin();
90        let stdout = io::stdout();
91        self.run_io(repository, stdin.lock(), stdout.lock())
92    }
93
94    fn run_io<R: BufRead, W: Write>(
95        &self,
96        repository: String,
97        mut input: R,
98        mut output: W,
99    ) -> Result<()> {
100        let mut line = String::new();
101        let mut fetch_refs = Vec::new();
102        let mut push_specs = Vec::new();
103
104        loop {
105            line.clear();
106            if input.read_line(&mut line)? == 0 {
107                break;
108            }
109            let command = line.trim_end();
110            if command.is_empty() {
111                if !fetch_refs.is_empty() {
112                    self.fetch(&repository, &fetch_refs)?;
113                    fetch_refs.clear();
114                    writeln!(output)?;
115                    output.flush()?;
116                }
117                if !push_specs.is_empty() {
118                    for spec in push_specs.drain(..) {
119                        self.push(&repository, &spec)?;
120                        writeln!(output, "ok {}", spec.remote)?;
121                    }
122                    writeln!(output)?;
123                    output.flush()?;
124                }
125                continue;
126            }
127
128            if command == "capabilities" {
129                writeln!(output, "option")?;
130                writeln!(output, "list")?;
131                writeln!(output, "fetch")?;
132                writeln!(output, "push")?;
133                writeln!(output)?;
134            } else if command == "list" || command == "list for-push" {
135                let refs = self.list(&repository)?;
136                output.write_all(&refs)?;
137                writeln!(output)?;
138            } else if let Some(rest) = command.strip_prefix("option ") {
139                let _ = rest;
140                writeln!(output, "ok")?;
141            } else if let Some(rest) = command.strip_prefix("fetch ") {
142                fetch_refs.push(parse_fetch_command(rest)?);
143            } else if let Some(rest) = command.strip_prefix("push ") {
144                push_specs.push(parse_push_spec(rest)?);
145            } else {
146                writeln!(output, "error unsupported command")?;
147            }
148            output.flush()?;
149        }
150        Ok(())
151    }
152
153    fn list(&self, repository: &str) -> Result<Vec<u8>> {
154        let response = self.client.request(
155            protocol::PATH_LIST,
156            protocol::repository_request(repository),
157        )?;
158        let bytes = protocol::response_bin(&response.data)?;
159        decode_status(bytes)
160    }
161
162    fn fetch(&self, repository: &str, refs: &[FetchRef]) -> Result<()> {
163        let have = refs.iter().map(|r| r.sha.clone()).collect::<Vec<_>>();
164        let response = self.client.request(
165            protocol::PATH_FETCH,
166            protocol::fetch_request(repository, &have),
167        )?;
168        if let Some(metadata) = response.metadata {
169            ensure_metadata_ok(&metadata)?;
170            let bundle = protocol::response_bin(&response.data)?;
171            git::fetch_bundle_into_local(
172                &bundle,
173                &refs.iter().map(|r| r.name.clone()).collect::<Vec<_>>(),
174            )?;
175            return Ok(());
176        }
177        let bytes = protocol::response_bin(&response.data)?;
178        let bundle = decode_status(bytes)?;
179        if !bundle.is_empty() {
180            git::fetch_bundle_into_local(
181                &bundle,
182                &refs.iter().map(|r| r.name.clone()).collect::<Vec<_>>(),
183            )?;
184        }
185        Ok(())
186    }
187
188    fn push(&self, repository: &str, spec: &PushSpec) -> Result<()> {
189        let mut updates = Vec::new();
190        let mut bundle_refs = Vec::new();
191        if spec.local.is_empty() {
192            updates.push(RefUpdate {
193                refname: spec.remote.clone(),
194                old: None,
195                new: None,
196                force: spec.force,
197            });
198        } else {
199            let sha = git::local_ref_sha(&spec.local)?
200                .ok_or_else(|| Error::msg(format!("unknown local ref {}", spec.local)))?;
201            bundle_refs.push(spec.local.clone());
202            updates.push(RefUpdate {
203                refname: spec.remote.clone(),
204                old: None,
205                new: Some(sha),
206                force: spec.force,
207            });
208        }
209
210        let bundle = git::create_local_bundle(&bundle_refs)?;
211        let response = self.client.request(
212            protocol::PATH_PUSH,
213            protocol::push_request(repository, bundle, updates),
214        )?;
215        let bytes = protocol::response_bin(&response.data)?;
216        let _ = decode_status(bytes)?;
217        Ok(())
218    }
219}
220
221struct SyncClient {
222    node: RnsNode,
223    link_id: [u8; 16],
224    state: Arc<(Mutex<ClientState>, Condvar)>,
225    request_timeout: Duration,
226}
227
228impl SyncClient {
229    fn request(&self, path: &str, data: Vec<u8>) -> Result<Response> {
230        {
231            let (lock, _) = &*self.state;
232            lock.lock().unwrap().responses.clear();
233        }
234        self.node
235            .send_request(self.link_id, path, &data)
236            .map_err(|_| Error::msg("failed to send request"))?;
237        let deadline = Instant::now() + self.request_timeout;
238        let (lock, cv) = &*self.state;
239        let mut state = lock.lock().unwrap();
240        loop {
241            if let Some(response) = state.responses.pop_front() {
242                return Ok(response);
243            }
244            let now = Instant::now();
245            if now >= deadline {
246                return Err(Error::msg("request timed out"));
247            }
248            let wait = deadline.saturating_duration_since(now);
249            let (next, _) = cv.wait_timeout(state, wait).unwrap();
250            state = next;
251        }
252    }
253}
254
255#[derive(Default)]
256struct SharedCallbacks {
257    state: Arc<(Mutex<ClientState>, Condvar)>,
258}
259
260#[derive(Default)]
261struct ClientState {
262    established: Vec<[u8; 16]>,
263    responses: VecDeque<Response>,
264}
265
266#[derive(Debug, Clone)]
267struct Response {
268    data: Vec<u8>,
269    metadata: Option<Vec<u8>>,
270}
271
272impl Callbacks for SharedCallbacks {
273    fn on_announce(&mut self, _announced: AnnouncedIdentity) {}
274
275    fn on_path_updated(&mut self, _dest_hash: DestHash, _hops: u8) {}
276
277    fn on_local_delivery(&mut self, _dest_hash: DestHash, _raw: Vec<u8>, _packet_hash: PacketHash) {
278    }
279
280    fn on_link_established(
281        &mut self,
282        link_id: LinkId,
283        _dest_hash: DestHash,
284        _rtt: f64,
285        _is_initiator: bool,
286    ) {
287        let (lock, cv) = &*self.state;
288        lock.lock().unwrap().established.push(link_id.0);
289        cv.notify_all();
290    }
291
292    fn on_response_with_metadata(
293        &mut self,
294        _link_id: LinkId,
295        _request_id: [u8; 16],
296        data: Vec<u8>,
297        metadata: Option<Vec<u8>>,
298    ) {
299        let (lock, cv) = &*self.state;
300        lock.lock()
301            .unwrap()
302            .responses
303            .push_back(Response { data, metadata });
304        cv.notify_all();
305    }
306}
307
308fn wait_for_link(
309    state: &Arc<(Mutex<ClientState>, Condvar)>,
310    link_id: [u8; 16],
311    timeout: Duration,
312) -> Result<()> {
313    let deadline = Instant::now() + timeout;
314    let (lock, cv) = &**state;
315    let mut state = lock.lock().unwrap();
316    loop {
317        if state.established.contains(&link_id) {
318            return Ok(());
319        }
320        let now = Instant::now();
321        if now >= deadline {
322            return Err(Error::msg("link establishment timed out"));
323        }
324        let (next, _) = cv
325            .wait_timeout(state, deadline.saturating_duration_since(now))
326            .unwrap();
327        state = next;
328    }
329}
330
331fn decode_status(bytes: Vec<u8>) -> Result<Vec<u8>> {
332    let Some((&code, body)) = bytes.split_first() else {
333        return Err(Error::msg("empty response"));
334    };
335    if code == protocol::RES_OK {
336        Ok(body.to_vec())
337    } else {
338        Err(Error::msg(format!(
339            "remote returned status 0x{code:02x}: {}",
340            String::from_utf8_lossy(body)
341        )))
342    }
343}
344
345fn ensure_metadata_ok(metadata: &[u8]) -> Result<()> {
346    let value = msgpack::unpack_exact(metadata)
347        .map_err(|e| Error::msg(format!("invalid response metadata: {e}")))?;
348    let Some(map) = value.as_map() else {
349        return Err(Error::msg("response metadata is not a map"));
350    };
351    let code = map.iter().find_map(|(key, value)| {
352        if matches!(key, Value::UInt(v) if *v == protocol::IDX_RESULT_CODE) {
353            value.as_uint().map(|v| v as u8)
354        } else {
355            None
356        }
357    });
358    match code {
359        Some(protocol::RES_OK) => Ok(()),
360        Some(code) => Err(Error::msg(format!("remote returned status 0x{code:02x}"))),
361        None => Err(Error::msg("response metadata missing status code")),
362    }
363}
364
365#[derive(Debug, Clone, PartialEq, Eq)]
366struct FetchRef {
367    sha: String,
368    name: String,
369}
370
371fn parse_fetch_command(input: &str) -> Result<FetchRef> {
372    let mut parts = input.split_whitespace();
373    let sha = parts
374        .next()
375        .ok_or_else(|| Error::msg("fetch command missing sha"))?;
376    let name = parts
377        .next()
378        .ok_or_else(|| Error::msg("fetch command missing ref"))?;
379    Ok(FetchRef {
380        sha: sha.to_string(),
381        name: name.to_string(),
382    })
383}
384
385#[derive(Debug, Clone, PartialEq, Eq)]
386struct PushSpec {
387    local: String,
388    remote: String,
389    force: bool,
390}
391
392fn parse_push_spec(input: &str) -> Result<PushSpec> {
393    let (force, spec) = input
394        .strip_prefix('+')
395        .map(|s| (true, s))
396        .unwrap_or((false, input));
397    let (local, remote) = spec
398        .split_once(':')
399        .ok_or_else(|| Error::msg("push spec must be <local>:<remote>"))?;
400    Ok(PushSpec {
401        local: local.to_string(),
402        remote: remote.to_string(),
403        force,
404    })
405}
406
407#[derive(Debug, Default)]
408struct ClientOptions {
409    config_dir: Option<PathBuf>,
410    rns_config_dir: Option<PathBuf>,
411    url: String,
412}
413
414impl ClientOptions {
415    fn parse<I>(args: I) -> Result<Self>
416    where
417        I: IntoIterator<Item = String>,
418    {
419        let mut options = ClientOptions::default();
420        let mut positional = Vec::new();
421        let mut args = args.into_iter();
422        while let Some(arg) = args.next() {
423            match arg.as_str() {
424                "-c" | "--config" => {
425                    options.config_dir = Some(PathBuf::from(
426                        args.next()
427                            .ok_or_else(|| Error::msg("missing config path"))?,
428                    ));
429                }
430                "--rnsconfig" => {
431                    options.rns_config_dir = Some(PathBuf::from(
432                        args.next()
433                            .ok_or_else(|| Error::msg("missing RNS config path"))?,
434                    ));
435                }
436                "-h" | "--help" => return Err(Error::msg(usage())),
437                other => positional.push(other.to_string()),
438            }
439        }
440        options.url = positional
441            .last()
442            .cloned()
443            .ok_or_else(|| Error::msg(usage()))?;
444        Ok(options)
445    }
446}
447
448fn usage() -> &'static str {
449    "usage: git-remote-rns [--config DIR] [--rnsconfig DIR] <remote-name> <rns://destination/repo>"
450}
451
452#[cfg(test)]
453mod tests {
454    use super::*;
455
456    #[test]
457    fn parses_fetch_command() {
458        assert_eq!(
459            parse_fetch_command("abc refs/heads/main").unwrap(),
460            FetchRef {
461                sha: "abc".into(),
462                name: "refs/heads/main".into()
463            }
464        );
465    }
466
467    #[test]
468    fn parses_forced_push_spec() {
469        assert_eq!(
470            parse_push_spec("+refs/heads/main:refs/heads/main").unwrap(),
471            PushSpec {
472                local: "refs/heads/main".into(),
473                remote: "refs/heads/main".into(),
474                force: true
475            }
476        );
477    }
478
479    #[test]
480    fn decodes_status_payload() {
481        assert_eq!(
482            decode_status(protocol::status_bytes(protocol::RES_OK, b"refs")).unwrap(),
483            b"refs"
484        );
485        assert!(decode_status(protocol::status_bytes(protocol::RES_NOT_FOUND, b"no")).is_err());
486    }
487
488    #[test]
489    fn metadata_status_ok_is_accepted() {
490        assert!(ensure_metadata_ok(&protocol::metadata_status(protocol::RES_OK)).is_ok());
491        assert!(ensure_metadata_ok(&protocol::metadata_status(protocol::RES_REMOTE_FAIL)).is_err());
492    }
493}