rs162 0.1.2

NMEA AIS Message Parser and Decoder with deku-based AIS message structures
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
//! SSH tunneling support for AIS sources
//!
//! This module provides SSH tunneling capabilities for TCP and MQTT sources,
//! allowing secure connections through jump hosts.

use makiko::{
    ChannelConfig, Client, ClientConfig, ClientReceiver, Privkey, TunnelReceiver, TunnelStream,
};
use once_cell::sync::Lazy;
use ssh2_config::{ParseRule, SshConfig};
use std::collections::HashMap;
use std::fs::File;
use std::io::BufReader;
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use tokio::{
    io::{AsyncRead, AsyncWrite},
    process::{ChildStdin, ChildStdout, Command},
};
use tracing::{error, info, warn};

type BoxError = Box<dyn std::error::Error + Send + Sync>;

pub static CONNECTION_MAP: Lazy<Arc<Mutex<HashMap<String, Client>>>> =
    Lazy::new(|| Arc::new(Mutex::new(HashMap::new())));

/// Configuration for tunnelled TCP connection
pub struct TunnelledTcp {
    pub address: String,
    pub port: u16,
    pub jump: String,
}

async fn authenticate_server(mut client_rx: ClientReceiver, host: String, port: u16) {
    let mut hosts_path = dirs::home_dir().unwrap();
    hosts_path.push(".ssh");
    hosts_path.push("known_hosts");

    let hosts_data = std::fs::read(&hosts_path).expect("Could not read known_hosts file");

    let mut hosts_file = makiko::host_file::File::decode(hosts_data.into());

    loop {
        // Wait for the next event.
        let event = client_rx
            .recv()
            .await
            .expect("Error while receiving client event");

        // Exit the loop when the client has closed.
        let Some(event) = event else { break };

        if let makiko::ClientEvent::ServerPubkey(pubkey, accept) = event {
            info!(
                "Server pubkey type {}, fingerprint {}",
                pubkey.type_str(),
                pubkey.fingerprint()
            );

            match hosts_file.match_host_port_key(&host, port, &pubkey) {
                makiko::host_file::KeyMatch::Accepted(entries) => {
                    info!("Found the server key in known_hosts file");
                    for entry in entries.iter() {
                        info!("At line {}", entry.line());
                    }
                    accept.accept();
                }
                makiko::host_file::KeyMatch::Revoked(_entry) => {
                    panic!("The server key was revoked in known_hosts file");
                }
                makiko::host_file::KeyMatch::OtherKeys(entries) => {
                    warn!("The known_hosts file specifies other keys for this server:");
                    for entry in entries.iter() {
                        warn!(
                            "At line {}, pubkey type {}, fingerprint {}",
                            entry.line(),
                            entry.pubkey().type_str(),
                            entry.pubkey().fingerprint()
                        );
                    }
                    panic!("Aborting, you might be target of a man-in-the-middle attack!");
                }
                makiko::host_file::KeyMatch::NotFound => {
                    info!(
                        "Did not find any key for this server in known_hosts file, \
                            adding it to the file"
                    );

                    accept.accept();

                    hosts_file.append_entry(
                        makiko::host_file::File::entry_builder()
                            .host_port(&host, port)
                            .key(pubkey),
                    );
                    let hosts_data = hosts_file.encode();
                    std::fs::write(&hosts_path, &hosts_data)
                        .expect("Could not write the modified known_hosts file");
                }
            }
        }
    }
}

async fn authenticate_by_private_key(client: &Client, user: &str, privkey: &Privkey) {
    let pubkey = privkey.pubkey();
    for pubkey_algo in pubkey.algos().iter().copied() {
        // Check whether this combination of a public key and algorithm would be
        // acceptable to the server.
        if client
            .check_pubkey(user.to_string(), &pubkey, pubkey_algo)
            .await
            .expect("Error when checking a public key")
        {
            // Try to authenticate with the private key
            let auth_res = client
                .auth_pubkey(user.to_string(), privkey.clone(), pubkey_algo)
                .await
                .expect("Error when trying to authenticate");

            // Deal with the possible outcomes of public key authentication.
            match auth_res {
                makiko::AuthPubkeyResult::Success => {
                    info!("We have successfully authenticated using a private key");
                    return;
                }
                makiko::AuthPubkeyResult::Failure(failure) => {
                    info!("The server rejected authentication with {pubkey_algo:?}: {failure:?}");
                }
            }
        }
    }
    panic!("The server does not accept the public key");
}

fn get_params() -> SshConfig {
    let config_path = dirs::home_dir().unwrap().join(".ssh").join("config");

    let err_msg = format!("{config_path:?} does not exist");
    let mut reader = BufReader::new(File::open(&config_path).expect(&err_msg));

    SshConfig::default()
        .parse(
            &mut reader,
            ParseRule::ALLOW_UNKNOWN_FIELDS | ParseRule::ALLOW_UNSUPPORTED_FIELDS,
        )
        .unwrap_or_else(|_| panic!("Failed to parse configuration file {config_path:?}"))
}

fn get_default_username() -> String {
    #[cfg(target_os = "windows")]
    let username = std::env::var("USERNAME")
        .unwrap_or_else(|_| panic!("Could not determine the current Windows user name"));
    #[cfg(not(target_os = "windows"))]
    let username = std::env::var("USER")
        .unwrap_or_else(|_| panic!("Could not determine the current user name"));
    username
}

enum Io {
    Tcp(TcpStream),
    Tunnel(TunnelStream),
    Proxy(ProxyCommand),
}

/**
 * This function connects to a server using SSH. It handles proxy commands
 * and proxy jumps. It also handles authentication using private keys.
 * It returns a Client object that can be used to interact with the server.
 */
#[async_recursion::async_recursion]
async fn connect_server(
    server: &str,
    params: &SshConfig,
    connection_map: Arc<Mutex<HashMap<String, Client>>>,
) -> Result<Client, BoxError> {
    // Check if the server is already connected
    // If so, return the existing connection
    if connection_map.lock().await.contains_key(server) {
        info!("Reusing existing connection to {server}");
        return Ok(connection_map.lock().await.get(server).unwrap().clone());
    }

    // Otherwise create a new connection
    let server_params = params.query(server);
    let hostname = server_params.host_name.unwrap();
    let port = server_params.port.unwrap_or(22);
    let user = server_params.user.unwrap_or(get_default_username());

    let io = match server_params.unsupported_fields.get("proxyjump") {
        None => match server_params.unsupported_fields.get("proxycommand") {
            None => Io::Tcp(TcpStream::connect((hostname.to_owned(), port)).await?),
            Some(args) => {
                let mut command = Command::new(args[0].clone());
                for arg in args[1..].iter() {
                    let arg = arg
                        // Replace %% with %
                        .replace("%%", "%")
                        // Replace the following placeholders with actual values
                        .replace("%h", &hostname)
                        .replace("%p", &port.to_string());
                    command.arg(arg);
                }
                info!("Executing proxy command: {command:?}");
                Io::Proxy(ProxyCommand::new(
                    command
                        .stdin(std::process::Stdio::piped())
                        .stdout(std::process::Stdio::piped())
                        .stderr(std::process::Stdio::piped()),
                ))
            }
        },
        Some(jump) => {
            let jump_server = jump.first().expect("No jump host specified");
            let jump_client = connect_server(jump_server, params, connection_map.clone()).await?;
            let channel_config = ChannelConfig::default();
            let origin_addr = ("127.0.0.1".into(), 0);
            let (tunnel, tunnel_rx) = jump_client
                .connect_tunnel(channel_config, (hostname.to_owned(), port), origin_addr)
                .await
                .expect("Could not open a tunnel");
            Io::Tunnel(TunnelStream::new(tunnel, tunnel_rx))
        }
    };

    let config = ClientConfig::default();
    let (client, client_rx) = match io {
        Io::Tcp(socket) => {
            let (client, client_rx, client_fut) = Client::open(socket, config)?;
            tokio::spawn(async move {
                client_fut.await.expect("Error in client future");
            });
            (client, client_rx)
        }
        Io::Tunnel(io) => {
            let (client, client_rx, client_fut) = Client::open(io, config)?;
            tokio::spawn(async move {
                client_fut.await.expect("Error in client future");
            });
            (client, client_rx)
        }
        Io::Proxy(io) => {
            let (client, client_rx, client_fut) = Client::open(io, config)?;
            tokio::spawn(async move {
                client_fut.await.expect("Error in client future");
            });
            (client, client_rx)
        }
    };

    tokio::task::spawn(authenticate_server(client_rx, hostname, port));

    let ssh_folder = dirs::home_dir().unwrap().join(".ssh");
    let mut decoded_privkey = None;
    let identity_files = server_params
        .identity_file
        .unwrap_or_else(|| vec![ssh_folder.join("id_rsa"), ssh_folder.join("id_ed25519")]);
    for file in identity_files.iter() {
        let filename = file.as_os_str();
        if let Ok(privkey) = tokio::fs::read(file).await {
            if let Ok(passphrase) = std::env::var("SSH_PASSPHRASE") {
                info!("Decoding private key {:?} with passphrase", &filename);
                if let Ok(res) = makiko::keys::decode_pem_privkey(&privkey, passphrase.as_bytes()) {
                    decoded_privkey = Some(res);
                    break;
                } else {
                    info!("Could not decode a private key from pem {:?}", &filename);
                    continue;
                }
            } else if let Ok(privkey) = std::fs::read(file) {
                if let Ok(data) = makiko::keys::decode_pem_privkey_nopass(&privkey) {
                    if let Some(key) = data.privkey().cloned() {
                        info!(
                            "Successfully decoded a private key {:?} without passphrase",
                            &filename
                        );
                        decoded_privkey = Some(key);
                        break;
                    }
                } else {
                    info!("Could not decode a private key from pem {:?}", &filename);
                    continue;
                }
            } else {
                info!("Identity file not found {:?}", &filename);
                continue;
            };
        }
    }
    let privkey = decoded_privkey.expect("None of the identity files could be decoded");
    authenticate_by_private_key(&client, &user, &privkey).await;

    connection_map
        .lock()
        .await
        .insert(server.to_string(), client.clone());

    Ok(client)
}

impl TunnelledTcp {
    pub async fn connect(&self) -> Result<TunnelReader, BoxError> {
        let params = get_params();

        let target_client = connect_server(&self.jump, &params, CONNECTION_MAP.clone())
            .await
            .map_err(|e| {
                let msg = format!("Could not connect to jump host {}: {}", &self.jump, e);
                BoxError::from(msg)
            })?;

        let channel_config = makiko::ChannelConfig::default();
        let connect_addr = (self.address.to_owned(), self.port);
        let origin_addr = ("0.0.0.0".into(), 0);

        let err_msg = format!("Could not open a tunnel to {connect_addr:?}");
        let (_tunnel, tunnel_rx) = target_client
            .connect_tunnel(channel_config, connect_addr, origin_addr)
            .await
            .expect(&err_msg);

        Ok(TunnelReader::new(tunnel_rx))
    }
}

/// Adapter to make TunnelReceiver work with AsyncRead/AsyncBufRead
pub struct TunnelReader {
    rx: TunnelReceiver,
    buffer: Vec<u8>,
    pos: usize,
}

impl TunnelReader {
    pub fn new(rx: TunnelReceiver) -> Self {
        Self {
            rx,
            buffer: Vec::new(),
            pos: 0,
        }
    }
}

impl AsyncRead for TunnelReader {
    fn poll_read(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>,
    ) -> std::task::Poll<std::io::Result<()>> {
        // If we have buffered data, return it first
        if self.pos < self.buffer.len() {
            let available = self.buffer.len() - self.pos;
            let to_copy = available.min(buf.remaining());
            buf.put_slice(&self.buffer[self.pos..self.pos + to_copy]);
            self.pos += to_copy;

            // Clear buffer if fully consumed
            if self.pos >= self.buffer.len() {
                self.buffer.clear();
                self.pos = 0;
            }

            return std::task::Poll::Ready(Ok(()));
        }

        // Otherwise, poll for new data from tunnel
        match self.rx.poll_recv(cx) {
            std::task::Poll::Ready(Ok(Some(makiko::TunnelEvent::Data(data)))) => {
                let to_copy = data.len().min(buf.remaining());
                buf.put_slice(&data[..to_copy]);

                // If we couldn't fit all data, buffer the rest
                if to_copy < data.len() {
                    self.buffer = data[to_copy..].to_vec();
                    self.pos = 0;
                }

                std::task::Poll::Ready(Ok(()))
            }
            std::task::Poll::Ready(Ok(Some(_))) => {
                // Other tunnel events (e.g., EOF, WindowAdjust)
                // Continue polling
                cx.waker().wake_by_ref();
                std::task::Poll::Pending
            }
            std::task::Poll::Ready(Ok(None)) => {
                // Tunnel closed
                std::task::Poll::Ready(Ok(()))
            }
            std::task::Poll::Ready(Err(e)) => {
                error!("Error reading from tunnel: {}", e);
                std::task::Poll::Ready(Err(std::io::Error::other(e)))
            }
            std::task::Poll::Pending => std::task::Poll::Pending,
        }
    }
}

#[derive(Debug)]
pub struct ProxyCommand {
    stdin: ChildStdin,
    stdout: ChildStdout,
}

impl ProxyCommand {
    pub fn new(command: &mut Command) -> Self {
        let mut command = command.spawn().expect("failed to spawn");
        let stdin = command.stdin.take().expect("failed to open stdin");
        let stdout = command.stdout.take().expect("failed to open stdout");
        ProxyCommand { stdin, stdout }
    }
}

impl AsyncRead for ProxyCommand {
    fn poll_read(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>,
    ) -> std::task::Poll<std::io::Result<()>> {
        let this = self.get_mut();
        std::pin::Pin::new(&mut this.stdout).poll_read(cx, buf)
    }
}

impl AsyncWrite for ProxyCommand {
    fn poll_write(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
        buf: &[u8],
    ) -> std::task::Poll<std::io::Result<usize>> {
        let this = self.get_mut();
        std::pin::Pin::new(&mut this.stdin).poll_write(cx, buf)
    }

    fn poll_flush(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<std::io::Result<()>> {
        let this = self.get_mut();
        std::pin::Pin::new(&mut this.stdin).poll_flush(cx)
    }

    fn poll_shutdown(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<std::io::Result<()>> {
        let this = self.get_mut();
        std::pin::Pin::new(&mut this.stdin).poll_shutdown(cx)
    }
}