use std::{
borrow::Cow,
fmt::Display,
io::{self, Read, Write},
net::SocketAddr,
path::Path,
sync::mpsc::Receiver,
thread::JoinHandle,
};
use anyhow::Context;
use crate::{
broadcast::receiver::{BroadcastReceiver, PayloadReader},
pb::ProgressBar,
tf::{sender_receive_file, sender_send_file},
};
pub trait ReceiverData {
fn addr(&self) -> SocketAddr;
}
pub trait App {
type Stream: Read + Write;
type UpgradeStream: Read + Write;
fn prefix(&self) -> &str;
fn broadcast_addr(&self) -> SocketAddr;
fn receiver_addr(&self) -> Option<SocketAddr>;
fn download_dir<'a>(&'a self) -> Cow<'a, Path>;
fn preprocess_connection(&self, stream: &mut Self::Stream) -> anyhow::Result<()> {
let _ = stream;
Ok(())
}
fn auth(&self, stream: &mut Self::Stream) -> anyhow::Result<bool> {
let _ = stream;
Ok(true)
}
fn upgrade_stream(&self, stream: Self::Stream) -> anyhow::Result<Self::UpgradeStream>;
fn postprocess_connection(&self, stream: &mut Self::UpgradeStream) -> anyhow::Result<()> {
let _ = stream;
Ok(())
}
fn create_progress_bar(&self, total: u64) -> Box<dyn ProgressBar>;
fn select_receiver_addr<U>(
&self,
discovery: (
Box<dyn FnOnce() + Send>,
Receiver<(SocketAddr, U)>,
JoinHandle<()>,
),
) -> Option<SocketAddr>
where
U: Clone + Display + PartialEq + ReceiverData + Send + 'static;
}
pub fn run_v1_0<A, P, ConnectFn, R>(
app: A,
files_to_send: impl Iterator<Item = P>,
connect: ConnectFn,
) -> anyhow::Result<()>
where
A: App,
P: AsRef<Path>,
ConnectFn: Fn(SocketAddr) -> io::Result<A::Stream>,
R: for<'a> TryFrom<(SocketAddr, PayloadReader<'a>)>
+ ReceiverData
+ Clone
+ Display
+ PartialEq
+ Send
+ 'static,
{
let receiver_addr = match app.receiver_addr() {
Some(addr) => Some(addr),
None => {
let receiver = BroadcastReceiver::builder()
.prefix(app.prefix())
.bind_addr(app.broadcast_addr())
.buffer_size(4 * 1024)
.build()
.context("Failed to build BroadcastReceiver")?;
let discovery = receiver.start::<R>();
app.select_receiver_addr(discovery)
}
};
let receiver_addr = receiver_addr.context("No valid receiver address found via broadcast")?;
let mut stream = connect(receiver_addr)
.with_context(|| format!("Failed to connect to {}", receiver_addr))?;
app.preprocess_connection(&mut stream)
.context("Pre-processing faild")?;
if !app.auth(&mut stream)? {
anyhow::bail!("authentication failed");
};
stream.write_all(b"fs-share:v1.0\n")?;
stream.flush()?;
let mut buf = [0u8; 8];
stream.read_exact(&mut buf)?;
match &buf {
b":reject:" => {
anyhow::bail!("faild to connect, version not match");
}
b":accept:" => {}
_ => anyhow::bail!("invalid connection"),
}
let mut stream = app.upgrade_stream(stream)?;
app.postprocess_connection(&mut stream)
.context("postprocess failed")?;
for path in files_to_send {
sender_send_file(&app, path, &mut stream)?;
}
stream.write_all(b":eof:")?;
stream.flush()?;
loop {
let mut marker = [0u8; 5];
stream.read_exact(&mut marker)?;
match &marker {
b":fff:" => {
sender_receive_file(&app, &mut stream)?;
}
b":eof:" => break,
_ => unreachable!("Invalid protocol marker"),
}
}
Ok(())
}