1use std::path::PathBuf;
6
7use bb8::ManageConnection;
8use sozu_command_lib::{
9 channel::{Channel, ChannelError},
10 config::Config,
11 proto::command::{Request, Response},
12};
13use tracing::{debug, error};
14
15use crate::socket;
16
17#[derive(thiserror::Error, Debug)]
21pub enum Error {
22 #[error("failed to load configuration, {0}")]
23 Load(crate::config::Error),
24 #[error("failed to connect to socket, {0}")]
25 Connect(socket::Error),
26 #[error("socket is unhealthy, {0}")]
27 SocketError(std::io::Error),
28 #[error("failed to set blocking the socket, {0}")]
29 Blocking(ChannelError),
30}
31
32#[derive(PartialEq, Eq, Clone, Debug)]
36pub struct ConnectionProperties {
37 pub socket: PathBuf,
38 pub buffer_size: u64,
39 pub max_buffer_size: u64,
40}
41
42impl From<&Config> for ConnectionProperties {
43 #[tracing::instrument(skip_all)]
44 fn from(config: &Config) -> Self {
45 Self {
46 socket: PathBuf::from(&config.command_socket),
47 buffer_size: config.command_buffer_size,
48 max_buffer_size: config.max_command_buffer_size,
49 }
50 }
51}
52
53impl TryFrom<&PathBuf> for ConnectionProperties {
54 type Error = Error;
55
56 #[tracing::instrument]
57 fn try_from(path: &PathBuf) -> Result<Self, Self::Error> {
58 let config = crate::config::try_from(path).map_err(Error::Load)?;
59
60 Ok(Self::from(&config))
61 }
62}
63
64impl ConnectionProperties {
65 #[tracing::instrument]
66 fn new(socket: PathBuf, buffer_size: u64, max_buffer_size: u64) -> Self {
67 Self {
68 socket,
69 buffer_size,
70 max_buffer_size,
71 }
72 }
73}
74
75#[derive(Clone, Debug)]
79pub struct ConnectionManager {
80 opts: ConnectionProperties,
81}
82
83impl ManageConnection for ConnectionManager {
84 type Connection = Channel<Request, Response>;
85 type Error = Error;
86
87 #[tracing::instrument(skip_all)]
88 async fn connect(&self) -> Result<Self::Connection, Self::Error> {
89 debug!(
90 path = self.opts.socket.display().to_string(),
91 "Connect to Sōzu' socket"
92 );
93 let sock = socket::connect(&self.opts.socket)
94 .await
95 .map_err(Error::Connect)?;
96
97 let mut channel = Channel::new(sock, self.opts.buffer_size, self.opts.max_buffer_size);
98
99 channel.blocking().map_err(Error::Blocking)?;
100
101 Ok(channel)
102 }
103
104 #[tracing::instrument(skip_all)]
105 async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
106 match conn.sock.take_error() {
111 Ok(Some(err)) | Err(err) => {
112 error!(error = err.to_string(), "connexion to sōzu has errors");
113 Err(Error::SocketError(err))
114 }
115 Ok(None) => Ok(()),
116 }
117 }
118
119 #[tracing::instrument(skip_all)]
120 fn has_broken(&self, conn: &mut Self::Connection) -> bool {
121 match conn.sock.take_error() {
125 Ok(Some(err)) | Err(err) => {
126 error!(error = err.to_string(), "connexion to sōzu has errors");
127 true
128 }
129 Ok(None) => false,
130 }
131 }
132}
133
134impl ConnectionManager {
135 #[tracing::instrument(skip_all)]
136 pub fn new(opts: ConnectionProperties) -> Self {
137 Self { opts }
138 }
139}