1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
//! # Channel module
//!
//! This module provides a connection manager over Sōzu channel

use std::path::PathBuf;

use bb8::ManageConnection;
use sozu_command_lib::{
    channel::Channel,
    config::Config,
    proto::command::{Request, Response},
};
use tracing::debug;

use crate::socket;

// -----------------------------------------------------------------------------
// Error

#[derive(thiserror::Error, Debug)]
pub enum Error {
    #[error("failed to load configuration, {0}")]
    Load(crate::config::Error),
    #[error("failed to connect to socket, {0}")]
    Connect(socket::Error),
    #[error("socket is unhealthy, {0}")]
    SocketError(std::io::Error),
    #[error("failed to set blocking the socket, {0}")]
    Blocking(Box<dyn std::error::Error + Send>),
}

// -----------------------------------------------------------------------------
// ConnectionManagerOpts

#[derive(PartialEq, Eq, Clone, Debug)]
pub struct ConnectionProperties {
    pub socket: PathBuf,
    pub buffer_size: usize,
    pub max_buffer_size: usize,
}

impl From<&Config> for ConnectionProperties {
    #[tracing::instrument(skip_all)]
    fn from(config: &Config) -> Self {
        Self {
            socket: PathBuf::from(&config.command_socket),
            buffer_size: config.command_buffer_size,
            max_buffer_size: config.max_command_buffer_size,
        }
    }
}

impl TryFrom<&PathBuf> for ConnectionProperties {
    type Error = Error;

    #[tracing::instrument]
    fn try_from(path: &PathBuf) -> Result<Self, Self::Error> {
        let config = crate::config::try_from(path).map_err(Error::Load)?;

        Ok(Self::from(&config))
    }
}

impl ConnectionProperties {
    #[tracing::instrument]
    fn new(socket: PathBuf, buffer_size: usize, max_buffer_size: usize) -> Self {
        Self {
            socket,
            buffer_size,
            max_buffer_size,
        }
    }
}

// -----------------------------------------------------------------------------
// ConnectionManager

#[derive(Clone, Debug)]
pub struct ConnectionManager {
    opts: ConnectionProperties,
}

#[async_trait::async_trait]
impl ManageConnection for ConnectionManager {
    type Connection = Channel<Request, Response>;
    type Error = Error;

    #[tracing::instrument(skip_all)]
    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
        debug!(
            path = self.opts.socket.display().to_string(),
            "Connect to Sōzu' socket"
        );
        let sock = socket::connect(&self.opts.socket)
            .await
            .map_err(Error::Connect)?;

        let mut channel = Channel::new(sock, self.opts.buffer_size, self.opts.max_buffer_size);

        channel
            .blocking()
            .map_err(|err| Error::Blocking(err.into()))?;

        Ok(channel)
    }

    #[tracing::instrument(skip_all)]
    async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
        // We do not perform a ping-pong message as we know that the unix socket
        // is working if it is present.
        //
        // We only check that the socket behind the connection has no error.
        match conn.sock.take_error() {
            Ok(Some(err)) | Err(err) => Err(Error::SocketError(err)),
            Ok(None) => Ok(()),
        }
    }

    #[tracing::instrument(skip_all)]
    fn has_broken(&self, conn: &mut Self::Connection) -> bool {
        // Check if the connection could be reused by the connection pool.
        //
        // We only check that the socket behind the connection has no error.
        match conn.sock.take_error() {
            Ok(Some(_)) | Err(_) => true,
            Ok(None) => false,
        }
    }
}

impl ConnectionManager {
    #[tracing::instrument(skip_all)]
    pub fn new(opts: ConnectionProperties) -> Self {
        Self { opts }
    }
}