1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
//! The engine is the top-level coordinator that runs and manages all entities
//! in the torrent engine. The user interacts with the engine via the
//! [`EngineHandle`] which exposes a restricted public API. The underlying
//! communication method is [tokio mpsc
//! channels](https://docs.rs/tokio/0.2.16/tokio/sync/mpsc).
//!
//! The engine is spawned as a [tokio
//! task](https://docs.rs/tokio/0.2.16/tokio/task) and runs in the background.
//! As with spawning other tokio tasks, it must be done within the context of
//! a tokio executor.
//!
//! The engine is run until an unrecoverable error occurs, or until the user
//! sends a shutdown command.
//!
//! For usage examples, see the [library documentation](crate).

use std::{
    collections::HashMap,
    net::{Ipv4Addr, SocketAddr},
};

use futures::stream::StreamExt;
use tokio::{
    sync::mpsc::{self, UnboundedReceiver, UnboundedSender},
    task,
};

use crate::{
    alert::{AlertReceiver, AlertSender},
    conf::{Conf, TorrentConf},
    disk::{self, error::NewTorrentError},
    error::*,
    metainfo::Metainfo,
    storage_info::StorageInfo,
    torrent::{self, Torrent},
    tracker::Tracker,
    Bitfield, TorrentId,
};

/// Spawns the engine as a tokio task.
///
/// As with spawning other tokio tasks, it must be done within the context of
/// a tokio executor.
///
/// The return value is a tuple of an [`EngineHandle`], which may be used to
/// send the engine commands, and an [`crate::alert::AlertReceiver`], to which
/// various components in the engine will send alerts of events.
pub fn spawn(conf: Conf) -> Result<(EngineHandle, AlertReceiver)> {
    log::info!("Spawning engine task");

    // create alert channels and return alert port to user
    let (alert_tx, alert_rx) = mpsc::unbounded_channel();
    let (mut engine, tx) = Engine::new(conf, alert_tx)?;

    let join_handle = task::spawn(async move { engine.run().await });
    log::info!("Spawned engine task");

    Ok((
        EngineHandle {
            tx,
            join_handle: Some(join_handle),
        },
        alert_rx,
    ))
}

type JoinHandle = task::JoinHandle<Result<()>>;

/// A handle to the currently running torrent engine.
pub struct EngineHandle {
    tx: Sender,
    join_handle: Option<JoinHandle>,
}

impl EngineHandle {
    /// Creates and starts a torrent, if its metainfo is valid.
    ///
    /// If successful, it returns the id of the torrent. This id can be used to
    /// identify the torrent when issuing further commands to engine.
    pub fn create_torrent(&self, params: TorrentParams) -> Result<TorrentId> {
        log::trace!("Creating torrent");
        let id = TorrentId::new();
        self.tx.send(Command::CreateTorrent { id, params })?;
        Ok(id)
    }

    /// Gracefully shuts down the engine and waits for all its torrents to do
    /// the same.
    ///
    /// # Panics
    ///
    /// This method panics if the engine has already been shut down.
    pub async fn shutdown(mut self) -> Result<()> {
        log::trace!("Shutting down engine task");
        self.tx.send(Command::Shutdown)?;
        if let Err(e) = self
            .join_handle
            .take()
            .expect("engine already shut down")
            .await
            .expect("task error")
        {
            log::error!("Engine error: {}", e);
        }
        Ok(())
    }
}

/// Information for creating a new torrent.
pub struct TorrentParams {
    /// Contains the torrent's metadata.
    pub metainfo: Metainfo,
    /// If set, overrides the default global config.
    pub conf: Option<TorrentConf>,
    /// Whether to download or seed the torrent.
    ///
    /// This is expected to be removed as this will become automatic once
    /// torrent resume data is supported.
    pub mode: Mode,
    /// The address on which the torrent should listen for new peers.
    ///
    /// This has to be unique for each torrent. If not set, or if already in
    /// use, a random port is assigned.
    // TODO: probably use an engine wide address, but requires some
    // rearchitecting
    pub listen_addr: Option<SocketAddr>,
}

/// The download mode.
// TODO: remove in favor of automatic detection
// TODO: when seeding is specified, we need to verify that the files to be
// seeded exist and are complete
#[derive(Debug)]
pub enum Mode {
    Download { seeds: Vec<SocketAddr> },
    Seed,
}

/// The channel through which the user can send commands to the engine.
pub(crate) type Sender = UnboundedSender<Command>;
/// The channel on which the engine listens for commands from the user.
type Receiver = UnboundedReceiver<Command>;

/// The type of commands that the engine can receive.
pub(crate) enum Command {
    /// Contains the information for creating a new torrent.
    CreateTorrent {
        id: TorrentId,
        params: TorrentParams,
    },
    /// Torrent allocation result. If successful, the id of the allocated
    /// torrent is returned for identification, if not, the reason of the error
    /// is included.
    TorrentAllocation {
        id: TorrentId,
        result: Result<(), NewTorrentError>,
    },
    /// Gracefully shuts down the engine and waits for all its torrents to do
    /// the same.
    Shutdown,
}

struct Engine {
    /// All currently running torrents in engine.
    torrents: HashMap<TorrentId, TorrentEntry>,

    /// The port on which other entities in the engine, or the API consumer
    /// sends the engine commands.
    cmd_rx: Receiver,

    /// The disk channel.
    disk_tx: disk::Sender,
    disk_join_handle: Option<disk::JoinHandle>,

    /// The channel on which tasks in the engine post alerts to user.
    alert_tx: AlertSender,

    /// The global engine configuration that includes defaults for torrents
    /// whose config is not overridden.
    conf: Conf,
}

/// A running torrent's entry in the engine.
struct TorrentEntry {
    /// The torrent's command channel on which engine sends commands to torrent.
    tx: torrent::Sender,
    /// The torrent task's join handle, used during shutdown.
    join_handle: Option<task::JoinHandle<torrent::error::Result<()>>>,
}

impl Engine {
    /// Creates a new engine, spawning the disk task.
    fn new(conf: Conf, alert_tx: AlertSender) -> Result<(Self, Sender)> {
        let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
        let (disk_join_handle, disk_tx) = disk::spawn(cmd_tx.clone())?;

        Ok((
            Self {
                torrents: HashMap::new(),
                cmd_rx,
                disk_tx,
                disk_join_handle: Some(disk_join_handle),
                alert_tx,
                conf,
            },
            cmd_tx,
        ))
    }

    /// Runs the engine until an unrecoverable error occurs, or until the user
    /// sends a shutdown command.
    async fn run(&mut self) -> Result<()> {
        log::info!("Starting engine");

        while let Some(cmd) = self.cmd_rx.next().await {
            match cmd {
                Command::CreateTorrent { id, params } => {
                    self.create_torrent(id, params).await?;
                }
                Command::TorrentAllocation { id, result } => match result {
                    Ok(_) => {
                        log::info!("Torrent {} allocated on disk", id);
                    }
                    Err(e) => {
                        log::error!(
                            "Error allocating torrent {} on disk: {}",
                            id,
                            e
                        );
                    }
                },
                Command::Shutdown => {
                    self.shutdown().await?;
                    break;
                }
            }
        }

        Ok(())
    }

    /// Creates and spawns a new torrent based on the parameters given.
    async fn create_torrent(
        &mut self,
        id: TorrentId,
        params: TorrentParams,
    ) -> Result<()> {
        let conf = params.conf.unwrap_or_else(|| self.conf.torrent.clone());
        let storage_info = StorageInfo::new(
            &params.metainfo,
            self.conf.engine.download_dir.clone(),
        );
        // TODO: don't duplicate trackers if multiple torrents use the same
        // ones (common in practice)
        let trackers = params
            .metainfo
            .trackers
            .into_iter()
            .map(Tracker::new)
            .collect();
        let own_pieces = params.mode.own_pieces(storage_info.piece_count);

        // create and spawn torrent
        // TODO: For now we spawn automatically, but later when we add torrent
        // pause/restart APIs, this will be a separate step. There should be
        // a `start` flag in `params` that says whether to immediately spawn
        // a new torrent (or maybe in `TorrentConf`).
        let (mut torrent, torrent_tx) = Torrent::new(torrent::Params {
            id,
            disk_tx: self.disk_tx.clone(),
            info_hash: params.metainfo.info_hash,
            storage_info: storage_info.clone(),
            own_pieces,
            trackers,
            client_id: self.conf.engine.client_id,
            listen_addr: params.listen_addr.unwrap_or_else(|| {
                // the port 0 tells the kernel to assign a free port from the
                // dynamic range
                SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 0)
            }),
            conf,
            alert_tx: self.alert_tx.clone(),
        });

        // Allocate torrent on disk. This is an asynchronous process and we can
        // start the torrent in the meantime.
        //
        // Technically we could have issues if the torrent connects to peers
        // that send data before we manage to allocate the (empty) files on
        // disk. However, this should be an extremely pathological case for
        // 2 reasons:
        // - Most torrents would be started without peers, so a torrent would
        //   have to wait for peers from its tracker(s). This should be
        //   a sufficiently long time to allocate torrent on disk.
        // - Then, even if we manage to connect peers quickly, testing shows
        //   that they don't tend to unchoke us immediately.
        //
        // Thus there is little chance to receive data and thus cause a disk
        // write or disk read immediatey.
        self.disk_tx.send(disk::Command::NewTorrent {
            id,
            storage_info: storage_info.clone(),
            piece_hashes: params.metainfo.pieces,
            torrent_tx: torrent_tx.clone(),
        })?;

        let seeds = params.mode.seeds();
        let join_handle =
            task::spawn(async move { torrent.start(&seeds).await });

        self.torrents.insert(
            id,
            TorrentEntry {
                tx: torrent_tx,
                join_handle: Some(join_handle),
            },
        );

        Ok(())
    }

    /// Gracefully shuts down the engine and all its components.
    async fn shutdown(&mut self) -> Result<()> {
        log::info!("Shutting down engine");

        // tell all torrents to shut down and join their tasks
        for torrent in self.torrents.values_mut() {
            // the torrent task may no longer be running, so don't panic here
            torrent.tx.send(torrent::Command::Shutdown).ok();
        }
        // Then join all torrent task handles. Shutting down a torrent may take
        // a while, so join as a separate step to first initiate the shutdown of
        // all torrents.
        for torrent in self.torrents.values_mut() {
            // FIXME: if torrent task is not running, does this panic?
            if let Err(e) = torrent
                .join_handle
                .take()
                .expect("torrent join handle missing")
                .await
                .expect("task error")
            {
                log::error!("Torrent error: {}", e);
            }
        }

        // send a shutdown command to disk
        self.disk_tx.send(disk::Command::Shutdown)?;
        // and join on its handle
        self.disk_join_handle
            .take()
            .expect("disk join handle missing")
            .await
            .expect("Disk task has panicked")
            .map_err(Error::from)?;

        return Ok(());
    }
}

impl Mode {
    fn own_pieces(&self, piece_count: usize) -> Bitfield {
        match self {
            Self::Download { .. } => Bitfield::repeat(false, piece_count),
            Self::Seed => Bitfield::repeat(true, piece_count),
        }
    }

    fn seeds(self) -> Vec<SocketAddr> {
        match self {
            Self::Download { seeds } => seeds,
            _ => Vec::new(),
        }
    }
}