daemon_engine/unix.rs
1/**
2 * rust-daemon
3 * Unix Server and Connection Implementations
4 *
5 * https://github.com/ryankurte/rust-daemon
6 * Copyright 2018 Ryan Kurte
7 */
8
9use std::fs;
10use std::fmt::{Debug};
11use std::clone::{Clone};
12use libc::{gid_t, uid_t};
13
14use tokio::prelude::*;
15use tokio::spawn;
16use tokio_codec::{Encoder, Decoder};
17
18use tokio_uds::{UnixListener, UnixStream};
19
20use crate::server::Server;
21use crate::connection::Connection;
22use crate::error::Error;
23
24use users::{User, Group, get_group_by_gid, get_user_by_uid};
25
26/// UnixServer is a Server implementation over UnixStream and UnixInfo types with a generic codec
27/// ```no_run
28/// extern crate tokio;
29/// use tokio::prelude::*;
30/// use tokio::{spawn, run};
31///
32/// #[macro_use]
33/// extern crate serde_derive;
34///
35/// extern crate daemon_engine;
36/// use daemon_engine::{UnixServer, JsonCodec};
37///
38/// #[derive(Debug, Clone, Serialize, Deserialize)]
39/// struct Request {}
40///
41/// #[derive(Debug, Clone, Serialize, Deserialize)]
42/// struct Response {}
43///
44/// # fn main() {
45///
46/// let addr = "/var/tmp/test-daemon.sock";
47/// let server = future::lazy(move || {
48/// let mut s = UnixServer::<JsonCodec<Response, Request>>::new(&addr, JsonCodec::new()).unwrap();
49/// let server_handle = s
50/// .incoming()
51/// .unwrap()
52/// .for_each(move |r| {
53/// println!("Request data {:?} info: {:?}", r.data(), r.info());
54/// r.send(Response{}).wait().unwrap();
55/// Ok(())
56/// }).map_err(|_e| ());
57/// spawn(server_handle);
58/// Ok(())
59/// });
60/// run(server);
61///
62/// # }
63/// ```
64pub type UnixServer<C> = Server<UnixStream, C, UnixInfo>;
65
66/// UnixConnection is a Connection implementation over UnixStream
67/// ```no_run
68/// use std::net::{SocketAddr, IpAddr, Ipv4Addr};
69///
70/// extern crate tokio;
71/// use tokio::prelude::*;
72/// use tokio::{spawn, run};
73///
74/// #[macro_use]
75/// extern crate serde_derive;
76///
77/// extern crate daemon_engine;
78/// use daemon_engine::{UnixConnection, JsonCodec, DaemonError};
79///
80/// #[derive(Debug, Clone, Serialize, Deserialize)]
81/// struct Request {}
82///
83/// #[derive(Debug, Clone, Serialize, Deserialize)]
84/// struct Response {}
85///
86/// # fn main() {
87/// let addr = "/var/tmp/test-daemon.sock";
88/// let client = UnixConnection::<JsonCodec<Request, Response>>::new(&addr, JsonCodec::new()).wait().unwrap();
89/// let (tx, rx) = client.split();
90///
91/// // Send data
92/// tx.send(Request{}).wait().unwrap();
93///
94/// // Receive data
95/// rx.map(|resp| -> Result<(), DaemonError> {
96/// println!("Response: {:?}", resp);
97/// Ok(())
98/// }).wait().next();
99/// # }
100/// ```
101pub type UnixConnection<C> = Connection<UnixStream, C>;
102
103impl <C> UnixConnection<C>
104where
105 C: Encoder + Decoder + Clone + Send + 'static,
106 <C as Decoder>::Item: Send,
107 <C as Decoder>::Error: Send + Debug,
108{
109 /// Create a new client connected to the provided unix socket address
110 pub fn new(path: &str, codec: C) -> impl Future<Item=UnixConnection<C>, Error=Error> {
111 debug!("[connector] creating connection (unix path: {})", path);
112 // Create the socket future
113 UnixStream::connect(&path)
114 .map(|s| {
115 Connection::from_socket(s, codec)
116 }).map_err(|e| e.into() )
117 }
118
119 pub fn close(self) {
120 self.shutdown();
121 }
122}
123
124/// UnixInfo is an information object associated with a given UnixServer connection.
125///
126/// This is passed to the server request handler to allow ACLs and connection tracking
127#[derive(Clone, Debug)]
128pub struct UnixInfo {
129 pub user: User,
130 pub group: Group,
131}
132
133impl UnixInfo {
134 pub fn new(uid: uid_t, gid: gid_t) -> UnixInfo {
135 let user = get_user_by_uid(uid).unwrap();
136 let group = get_group_by_gid(gid).unwrap();
137 UnixInfo{user, group}
138 }
139}
140
141/// Unix server implementation
142///
143/// This binds to and listens on a unix domain socket
144impl<C> UnixServer<C>
145where
146 C: Encoder + Decoder + Clone + Send + 'static,
147 <C as Decoder>::Item: Clone + Send + Debug,
148 <C as Decoder>::Error: Send + Debug,
149 <C as Encoder>::Item: Clone + Send + Debug,
150 <C as Encoder>::Error: Send + Debug,
151{
152 pub fn new(path: &str, codec: C) -> Result<UnixServer<C>, Error> {
153 // Pre-clear socket file
154 let _res = fs::remove_file(&path);
155
156 // Create base server instance
157 let server = Server::base(codec);
158
159 // Create listener socket
160 let socket = UnixListener::bind(&path)?;
161
162 let exit_rx = server.exit_rx.lock().unwrap().take();
163 let mut server_int = server.clone();
164
165 // Create listening thread
166 let tokio_server = socket
167 .incoming()
168 .for_each(move |s| {
169 let creds = s.peer_cred().unwrap();
170 let info = UnixInfo::new(creds.uid, creds.gid);
171 server_int.bind(info, s);
172 Ok(())
173 })
174 .map_err(|err| {
175 error!("[server] accept error: {:?}", err);
176 })
177 .select2(exit_rx)
178 .then(|_| {
179 debug!("[server] closing listener");
180 Ok(())
181 });
182 spawn(tokio_server);
183
184 // Return new connector instance
185 Ok(server)
186 }
187
188 pub fn shutdown(&self) {
189
190 }
191}
192