1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
//! Receive incoming Iridium messages through their Direct IP service.
//!
//! Iridium `DirectIP` is a service provided by the Iridium company. New Mobile
//! Originated messages are forwarded from the Iridium servers to a configured
//! IP address. The Iridum service attempts to initate a TCP connection to port
//! 10800 at the specified IP. If the connection is successful, the MO message
//! is transmitted, then the connection is closed.
//!
//! This module provides a `Server` structure, which can be created to run
//! forever and receive those incoming MO messages.

use std::{
    io,
    net::{TcpListener, TcpStream, ToSocketAddrs},
    sync::{Arc, Mutex},
    thread,
};

use log::{debug, error, info, warn};

use crate::{mo::Message, storage::Storage};

/// A Iridium `DirectIP` server.
///
/// The server will listen on a socket address for incoming Iridium SBD Mobile Originated
/// messages. Incoming messages will be stored using `sbd::filesystem::Storage`. Errors are logged
/// using the logging framework.
#[derive(Debug)]
pub struct Server<A: ToSocketAddrs + Sync, S: Storage + Sync + Send> {
    addr: A,
    listener: Option<TcpListener>,
    storage: Arc<Mutex<S>>,
}

impl<A, S> Server<A, S>
where
    A: ToSocketAddrs + Sync,
    S: 'static + Storage + Sync + Send,
{
    /// Creates a new server that will listen on `addr` and write messages to `storage`.
    ///
    /// This method does not actually bind to the socket address or do anything with the storage.
    /// Use `bind` and `serve_forever` to actually do stuff.
    ///
    /// The provided storage is expected to be ready to accept new messages.
    ///
    /// # Examples
    ///
    /// ```
    /// let storage = sbd::storage::MemoryStorage::new();
    /// let server = sbd::directip::Server::new("0.0.0.0:10800", storage);
    /// ```
    pub fn new(addr: A, storage: S) -> Server<A, S> {
        Server {
            addr,
            listener: None,
            storage: Arc::new(Mutex::new(storage)),
        }
    }

    /// Binds this server to its tcp socket.
    ///
    /// This is a seperate operation from `serve_forever` so that we can capture any errors
    /// associated with the underlying `TcpListener::bind`.
    ///
    /// # Examples
    ///
    /// ```
    /// let storage = sbd::storage::MemoryStorage::new();
    /// let mut server = sbd::directip::Server::new("0.0.0.0:10800", storage);
    /// server.bind().unwrap();
    /// ```
    pub fn bind(&mut self) -> io::Result<()> {
        self.listener = Some(self.create_listener()?);
        Ok(())
    }

    /// Starts the DirectIP server and serves forever.
    ///
    /// # Panics
    ///
    /// This method panics if it has a problem binding to the tcp socket address. To avoid a panic,
    /// use `Server::bind` before calling `Server::serve_forever`.
    ///
    /// # Examples
    ///
    /// ```no_run
    /// let storage = sbd::storage::MemoryStorage::new();
    /// let mut server = sbd::directip::Server::new("0.0.0.0:10800", storage);
    /// server.bind().unwrap();
    /// server.serve_forever();
    /// ```
    pub fn serve_forever(mut self) {
        let listener = match self.listener {
            Some(ref listener) => listener,
            None => {
                self.listener = Some(self.create_listener().unwrap());
                self.listener.as_ref().unwrap()
            }
        };
        for stream in listener.incoming() {
            match stream {
                Ok(stream) => {
                    let storage = Arc::clone(&self.storage);
                    thread::spawn(move || handle_stream(stream, storage));
                }
                Err(err) => {
                    thread::spawn(move || handle_error(&err));
                }
            }
        }
    }

    fn create_listener(&self) -> io::Result<TcpListener> {
        TcpListener::bind(&self.addr)
    }
}

/// Handles an incoming `DirectIP` stream.
fn handle_stream(stream: TcpStream, storage: Arc<Mutex<dyn Storage>>) {
    match stream.peer_addr() {
        Ok(addr) => {
            debug!("Handling TcpStream from {}", addr);
        }
        Err(err) => {
            warn!(
                "Problem when extracting peer address from TcpStream, but we'll press on: {:?}",
                err
            );
        }
    }
    let message = match Message::read_from(stream) {
        Ok(message) => {
            info!(
                "Recieved message from IMEI {} with MOMN {} and {} byte payload",
                message.imei(),
                message.momsn(),
                message.payload().len(),
            );
            message
        }
        Err(err) => {
            error!("Error when reading message: {:?}", err);
            return;
        }
    };
    match storage
        .lock()
        .expect("unable to lock storage mutex")
        .store(message)
    {
        Ok(_) => info!("Stored message"),
        Err(err) => error!("Problem storing message: {:?}", err),
    }
}

/// Handles an error when handling a connection.
fn handle_error(err: &io::Error) {
    error!("Error when receiving tcp communication: {:?}", err);
}