use crate::error::{self, Result};
use crate::ingress::{Buffer, ProtocolVersion, SenderBuilder};
use std::fmt::{Debug, Formatter};
#[cfg(feature = "sync-sender-tcp")]
mod tcp;
#[cfg(feature = "sync-sender-tcp")]
pub(crate) use tcp::*;
#[cfg(feature = "sync-sender-tcp")]
use std::io::Write;
#[cfg(feature = "sync-sender-tcp")]
use crate::ingress::map_io_to_socket_err;
#[cfg(feature = "sync-sender-http")]
mod http;
#[cfg(feature = "sync-sender-http")]
pub(crate) use http::*;
pub(crate) enum SyncProtocolHandler {
#[cfg(feature = "sync-sender-tcp")]
SyncTcp(SyncConnection),
#[cfg(feature = "sync-sender-http")]
SyncHttp(SyncHttpHandlerState),
}
pub struct Sender {
descr: String,
handler: SyncProtocolHandler,
connected: bool,
max_buf_size: usize,
protocol_version: ProtocolVersion,
max_name_len: usize,
}
impl Debug for Sender {
fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
f.write_str(self.descr.as_str())
}
}
impl Sender {
pub(crate) fn new(
descr: String,
handler: SyncProtocolHandler,
max_buf_size: usize,
protocol_version: ProtocolVersion,
max_name_len: usize,
) -> Self {
Self {
descr,
handler,
connected: true,
max_buf_size,
protocol_version,
max_name_len,
}
}
pub fn from_conf<T: AsRef<str>>(conf: T) -> Result<Self> {
SenderBuilder::from_conf(conf)?.build()
}
pub fn from_env() -> Result<Self> {
SenderBuilder::from_env()?.build()
}
pub fn new_buffer(&self) -> Buffer {
Buffer::with_max_name_len(self.protocol_version, self.max_name_len)
}
#[allow(unused_variables)]
fn flush_impl(&mut self, buf: &Buffer, transactional: bool) -> Result<()> {
if !self.connected {
return Err(error::fmt!(
SocketError,
"Could not flush buffer: not connected to database."
));
}
buf.check_can_flush()?;
if buf.len() > self.max_buf_size {
return Err(error::fmt!(
InvalidApiCall,
"Could not flush buffer: Buffer size of {} exceeds maximum configured allowed size of {} bytes.",
buf.len(),
self.max_buf_size
));
}
self.check_protocol_version(buf.protocol_version())?;
let bytes = buf.as_bytes();
if bytes.is_empty() {
return Ok(());
}
match self.handler {
#[cfg(feature = "sync-sender-tcp")]
SyncProtocolHandler::SyncTcp(ref mut conn) => {
if transactional {
return Err(error::fmt!(
InvalidApiCall,
"Transactional flushes are not supported for ILP over TCP."
));
}
conn.write_all(bytes).map_err(|io_err| {
self.connected = false;
map_io_to_socket_err("Could not flush buffer: ", io_err)
})?;
conn.flush().map_err(|io_err| {
self.connected = false;
map_io_to_socket_err("Could not flush to network: ", io_err)
})?;
Ok(())
}
#[cfg(feature = "sync-sender-http")]
SyncProtocolHandler::SyncHttp(ref state) => {
if transactional && !buf.transactional() {
return Err(error::fmt!(
InvalidApiCall,
"Buffer contains lines for multiple tables. \
Transactional flushes are only supported for buffers containing lines for a single table."
));
}
let request_min_throughput = *state.config.request_min_throughput;
let extra_time = if request_min_throughput > 0 {
(bytes.len() as f64) / (request_min_throughput as f64)
} else {
0.0f64
};
match http_send_with_retries(
state,
bytes,
*state.config.request_timeout + std::time::Duration::from_secs_f64(extra_time),
*state.config.retry_timeout,
) {
Ok(res) => {
if res.status().is_client_error() || res.status().is_server_error() {
Err(parse_http_error(res.status().as_u16(), res))
} else {
res.into_body();
Ok(())
}
}
Err(err) => Err(crate::error::Error::from_ureq_error(err, &state.url)),
}
}
}
}
#[cfg(feature = "sync-sender-http")]
pub fn flush_and_keep_with_flags(&mut self, buf: &Buffer, transactional: bool) -> Result<()> {
self.flush_impl(buf, transactional)
}
pub fn flush_and_keep(&mut self, buf: &Buffer) -> Result<()> {
self.flush_impl(buf, false)
}
pub fn flush(&mut self, buf: &mut Buffer) -> crate::Result<()> {
self.flush_impl(buf, false)?;
buf.clear();
Ok(())
}
pub fn must_close(&self) -> bool {
!self.connected
}
pub fn protocol_version(&self) -> ProtocolVersion {
self.protocol_version
}
pub fn max_name_len(&self) -> usize {
self.max_name_len
}
#[inline(always)]
fn check_protocol_version(&self, version: ProtocolVersion) -> Result<()> {
if self.protocol_version != version {
return Err(error::fmt!(
ProtocolVersionError,
"Attempting to send with protocol version {} \
but the sender is configured to use protocol version {}",
version,
self.protocol_version
));
}
Ok(())
}
}