fire_stream/
client.rs

1use crate::plain;
2use crate::error::{TaskError, RequestError};
3use crate::handler::{
4	TaskHandle, client::Sender, StreamSender, StreamReceiver, Configurator
5};
6use crate::packet::{Packet, PlainBytes};
7use crate::util::{PinnedFuture, ByteStream};
8
9#[cfg(feature = "encrypted")]
10use crate::{encrypted, packet::EncryptedBytes};
11#[cfg(feature = "encrypted")]
12use crypto::signature as sign;
13
14use std::time::Duration;
15use std::io;
16
17
18#[derive(Debug, Clone)]
19pub struct Config {
20	pub timeout: Duration,
21	/// if the limit is 0 there is no limit
22	pub body_limit: u32
23}
24
25/// Reconnection strategy
26/// 
27/// You should probably add a timeout before reconnecting
28pub struct ReconStrat<S> {
29	// async fn(error_count: usize) -> io::Result<S>
30	pub(crate) inner: Box<
31		dyn FnMut(usize) -> PinnedFuture<'static, io::Result<S>> + Send
32	>
33}
34
35impl<S> ReconStrat<S> {
36	/// Expects the following fn:
37	/// ```ignore
38	/// async fn new_stream(error_count: usize) -> io::Result<S>;
39	/// ```
40	pub fn new<F: 'static>(f: F) -> Self
41	where F: FnMut(usize) -> PinnedFuture<'static, io::Result<S>> + Send {
42		Self {
43			inner: Box::new(f)
44		}
45	}
46}
47
48/// A connection to a server
49pub struct Connection<P> {
50	sender: Sender<P>,
51	task: TaskHandle
52}
53
54impl<P> Connection<P> {
55	/// Creates a new connection to a server with an existing stream
56	pub fn new<S>(
57		byte_stream: S,
58		cfg: Config,
59		recon_strat: Option<ReconStrat<S>>
60	) -> Self
61	where
62		S: ByteStream,
63		P: Packet<PlainBytes> + Send + 'static,
64		P::Header: Send
65	{
66		plain::client(byte_stream, cfg, recon_strat)
67	}
68
69	/// Creates a new connection to a server and encrypting all packets sent
70	#[cfg(feature = "encrypted")]
71	#[cfg_attr(docsrs, doc(cfg(feature = "encrypted")))]
72	pub fn new_encrypted<S>(
73		byte_stream: S,
74		cfg: Config,
75		recon_strat: Option<ReconStrat<S>>,
76		sign: sign::PublicKey
77	) -> Self
78	where
79		S: ByteStream,
80		P: Packet<EncryptedBytes> + Send + 'static,
81		P::Header: Send
82	{
83		encrypted::client(byte_stream, cfg, recon_strat, sign)
84	}
85
86	/// Creates a new Stream.
87	pub(crate) fn new_raw(sender: Sender<P>, task: TaskHandle) -> Self {
88		Self { sender, task }
89	}
90
91	/// Update the connection configuration
92	pub fn update_config(&self, cfg: Config) {
93		self.sender.update_config(cfg);
94	}
95
96	/// Get's a `Configurator` which allows to configure this connection
97	/// without needing to have access to the connection
98	pub fn configurator(&self) -> Configurator<Config> {
99		self.sender.configurator()
100	}
101
102	/// Send a request waiting until a response is available or the connection
103	/// closes
104	/// 
105	/// ## Errors
106	/// - Writing the packet failed
107	/// - Reading the response packet failed
108	/// - Io Error
109	pub async fn request(&self, packet: P) -> Result<P, RequestError> {
110		self.sender.request(packet).await
111	}
112
113	/// Create a new stream to send packets.
114	pub async fn request_sender(
115		&self,
116		packet: P
117	) -> Result<StreamSender<P>, RequestError> {
118		self.sender.request_sender(packet).await
119	}
120
121	/// Opens a new stream to listen to packets.
122	pub async fn request_receiver(
123		&self,
124		packet: P
125	) -> Result<StreamReceiver<P>, RequestError> {
126		self.sender.request_receiver(packet).await
127	}
128
129	// /// Create a new stream to send packets.
130	// pub async fn request_stream(&self, packet: P) -> Result<StreamSender<P>> {
131	// 	self.sender.create_stream(packet).await
132	// }
133
134	/// Wait until the connection has nothing more todo which will then close
135	/// the connection.
136	pub async fn wait(self) -> Result<(), TaskError> {
137		self.task.wait().await
138	}
139
140	/// Send a close signal to the background task and wait until it closes.
141	pub async fn close(self) -> Result<(), TaskError> {
142		self.task.close().await
143	}
144}