daemon_engine/
unix.rs

1/**
2 * rust-daemon
3 * Unix Server and Connection Implementations
4 *
5 * https://github.com/ryankurte/rust-daemon
6 * Copyright 2018 Ryan Kurte
7 */
8
9use std::fs;
10use std::fmt::{Debug};
11use std::clone::{Clone};
12use libc::{gid_t, uid_t};
13
14use tokio::prelude::*;
15use tokio::spawn;
16use tokio_codec::{Encoder, Decoder};
17
18use tokio_uds::{UnixListener, UnixStream};
19
20use crate::server::Server;
21use crate::connection::Connection;
22use crate::error::Error;
23
24use users::{User, Group, get_group_by_gid, get_user_by_uid};
25
26/// UnixServer is a Server implementation over UnixStream and UnixInfo types with a generic codec
27/// ```no_run
28/// extern crate tokio;
29/// use tokio::prelude::*;
30/// use tokio::{spawn, run};
31/// 
32/// #[macro_use]
33/// extern crate serde_derive;
34/// 
35/// extern crate daemon_engine;
36/// use daemon_engine::{UnixServer, JsonCodec};
37/// 
38/// #[derive(Debug, Clone, Serialize, Deserialize)]
39/// struct Request {}
40/// 
41/// #[derive(Debug, Clone, Serialize, Deserialize)]
42/// struct Response {}
43/// 
44/// # fn main() {
45/// 
46/// let addr = "/var/tmp/test-daemon.sock";
47/// let server = future::lazy(move || {
48///     let mut s = UnixServer::<JsonCodec<Response, Request>>::new(&addr, JsonCodec::new()).unwrap();
49///     let server_handle = s
50///         .incoming()
51///         .unwrap()
52///         .for_each(move |r| {
53///             println!("Request data {:?} info: {:?}", r.data(), r.info());
54///             r.send(Response{}).wait().unwrap();
55///             Ok(())
56///         }).map_err(|_e| ());
57///     spawn(server_handle);
58///     Ok(())
59/// });
60/// run(server);
61/// 
62/// # }
63/// ```
64pub type UnixServer<C> = Server<UnixStream, C, UnixInfo>;
65
66/// UnixConnection is a Connection implementation over UnixStream
67/// ```no_run
68/// use std::net::{SocketAddr, IpAddr, Ipv4Addr};
69/// 
70/// extern crate tokio;
71/// use tokio::prelude::*;
72/// use tokio::{spawn, run};
73/// 
74/// #[macro_use]
75/// extern crate serde_derive;
76/// 
77/// extern crate daemon_engine;
78/// use daemon_engine::{UnixConnection, JsonCodec, DaemonError};
79/// 
80/// #[derive(Debug, Clone, Serialize, Deserialize)]
81/// struct Request {}
82/// 
83/// #[derive(Debug, Clone, Serialize, Deserialize)]
84/// struct Response {}
85/// 
86/// # fn main() {
87/// let addr = "/var/tmp/test-daemon.sock";
88/// let client = UnixConnection::<JsonCodec<Request, Response>>::new(&addr, JsonCodec::new()).wait().unwrap();
89/// let (tx, rx) = client.split();
90/// 
91/// // Send data
92/// tx.send(Request{}).wait().unwrap();
93/// 
94/// // Receive data
95/// rx.map(|resp| -> Result<(), DaemonError> {
96///    println!("Response: {:?}", resp);
97///    Ok(())
98/// }).wait().next();
99/// # }
100/// ```
101pub type UnixConnection<C> = Connection<UnixStream, C>;
102
103impl <C> UnixConnection<C> 
104where
105    C: Encoder + Decoder + Clone + Send + 'static,
106    <C as Decoder>::Item: Send,
107    <C as Decoder>::Error: Send + Debug,
108{
109    /// Create a new client connected to the provided unix socket address
110    pub fn new(path: &str, codec: C) -> impl Future<Item=UnixConnection<C>, Error=Error> {
111        debug!("[connector] creating connection (unix path: {})", path);
112        // Create the socket future
113        UnixStream::connect(&path)
114        .map(|s| {
115            Connection::from_socket(s, codec)
116        }).map_err(|e| e.into() )
117    }
118
119    pub fn close(self) {
120        self.shutdown();
121    }
122}
123
124/// UnixInfo is an information object associated with a given UnixServer connection.
125/// 
126/// This is passed to the server request handler to allow ACLs and connection tracking
127#[derive(Clone, Debug)]
128pub struct  UnixInfo {
129    pub user: User,
130    pub group: Group,
131}
132
133impl UnixInfo {
134    pub fn new(uid: uid_t, gid: gid_t) -> UnixInfo {
135        let user = get_user_by_uid(uid).unwrap();
136        let group = get_group_by_gid(gid).unwrap();
137        UnixInfo{user, group}
138    }
139}
140
141/// Unix server implementation
142/// 
143/// This binds to and listens on a unix domain socket
144impl<C> UnixServer<C>
145where
146    C: Encoder + Decoder + Clone + Send + 'static,
147    <C as Decoder>::Item: Clone + Send + Debug,
148    <C as Decoder>::Error: Send + Debug,
149    <C as Encoder>::Item: Clone + Send + Debug,
150    <C as Encoder>::Error: Send + Debug,
151{
152    pub fn new(path: &str, codec: C) -> Result<UnixServer<C>, Error> {
153        // Pre-clear socket file
154        let _res = fs::remove_file(&path);
155
156        // Create base server instance
157        let server = Server::base(codec);
158
159        // Create listener socket
160        let socket = UnixListener::bind(&path)?;
161
162        let exit_rx = server.exit_rx.lock().unwrap().take();
163        let mut server_int = server.clone();
164
165        // Create listening thread
166        let tokio_server = socket
167            .incoming()
168            .for_each(move |s| {
169                let creds = s.peer_cred().unwrap();
170                let info = UnixInfo::new(creds.uid, creds.gid);
171                server_int.bind(info, s); 
172                Ok(())
173             })
174            .map_err(|err| {
175                error!("[server] accept error: {:?}", err);
176            })
177            .select2(exit_rx)
178            .then(|_| {
179                debug!("[server] closing listener");
180                Ok(())
181            });
182        spawn(tokio_server);
183
184        // Return new connector instance
185        Ok(server)
186    }
187
188    pub fn shutdown(&self) {
189
190    }
191}
192