use crate::error::HlsResult;
use crate::stream::StreamParam;
use crate::*;
use std::io;
use std::io::{Read, Write};
pub struct SyncStream<S> {
conn: Connection,
stream: S,
handshake_finished: bool,
encrypted_channel: bool,
hello_retrying: bool,
read_buffer: Buffer,
write_buffer: Buffer,
}
impl<S: Read + Write> SyncStream<S> {
fn new(stream: S, conn: Connection, mut config: Config<'_>, buffer: Buffer) -> HlsResult<SyncStream<S>> {
let mut stream = SyncStream {
stream,
conn,
handshake_finished: false,
encrypted_channel: false,
hello_retrying: false,
read_buffer: Buffer::with_capacity(16438),
write_buffer: buffer,
};
if let Config::Client(ref mut config) = config {
stream.handle_client_hello(config)?;
stream.stream.write_all(stream.write_buffer.filled())?;
stream.write_buffer.reset();
}
let mut app_buffer = Buffer::with_capacity(16438);
loop {
let record_len = stream.read_next_packet()?;
stream.handle_record(record_len, Some(&mut config), app_buffer.unfilled())?;
if !stream.write_buffer.is_empty() {
stream.stream.write_all(stream.write_buffer.filled())?;
stream.write_buffer.reset();
}
if stream.handshake_finished { break; }
}
Ok(stream)
}
pub fn connect(config: ClientConfig, stream: S) -> HlsResult<SyncStream<S>> {
let session = config.session.as_ref().cloned().unwrap_or_else(|| Default::default());
SyncStream::new(stream, Connection::from_client(rand::random(), session, config.key_log.clone()).with_verify(config.verify), Config::Client(config), Buffer::default())
}
pub fn accept(stream: S, config: ServerConfig<'_>) -> HlsResult<SyncStream<S>> {
SyncStream::new(stream, Connection::default(), Config::Server(config), Buffer::with_capacity(16437))
}
pub fn shutdown(&mut self) -> HlsResult<()> {
self.write_buffer.reset();
let out = self.write_buffer.unfilled();
let record_len = self.conn.make_message(RecordType::Alert, out, &[1, 0])?;
self.stream.write_all(&out[..record_len])?;
Ok(())
}
pub fn alpn(&self) -> Option<&str> {
Some(self.conn.alpn()?.value())
}
pub fn connection(&self) -> &Connection {
&self.conn
}
}
impl<S: Read + Write> StreamHandle for SyncStream<S> {
fn stream_param(&mut self) -> (&mut Buffer, StreamParam<'_>) {
(&mut self.read_buffer, StreamParam {
handshake_finish: &mut self.handshake_finished,
encrypted_channel: &mut self.encrypted_channel,
hello_retrying: &mut self.hello_retrying,
write_buffer: &mut self.write_buffer,
conn: &mut self.conn,
})
}
}
impl<S: Read> SyncStream<S> {
fn read_size(&mut self, max_size: usize) -> HlsResult<()> {
while self.read_buffer.len() < max_size {
self.read_buffer.check_move(max_size)?;
let len = self.stream.read(self.read_buffer.unfilled())?;
if len == 0 { return Err(HlsError::PeerClosedConnection); }
self.read_buffer.add_len(len);
}
Ok(())
}
fn read_next_packet(&mut self) -> HlsResult<usize> {
if self.read_buffer.len() < 5 { self.read_size(5)?; }
let record_len = u16::from_be_bytes([self.read_buffer.filled()[3], self.read_buffer.filled()[4]]) as usize + 5;
self.read_size(record_len)?;
Ok(record_len)
}
}
impl<S: Read + Write> Read for SyncStream<S> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
loop {
let record_len = self.read_next_packet()?;
let size = self.handle_record(record_len, None, buf)?;
if size > 0 { return Ok(size); }
};
}
}
impl<S: Write> Write for SyncStream<S> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let mut sent = 0;
for chunk in buf.chunks(16384) {
self.write_buffer.reset();
let record_len = self.conn.make_message(RecordType::ApplicationData, self.write_buffer.unfilled(), chunk)?;
self.write_buffer.add_len(record_len);
loop {
let len = self.stream.write(self.write_buffer.filled())?;
if self.write_buffer.used_empty(len) { break; }
}
sent += chunk.len();
}
Ok(sent)
}
fn flush(&mut self) -> io::Result<()> {
self.stream.flush()
}
}