1use prelude::{Protocol, IoControl, GetSocketOption, SetSocketOption};
2use ffi::{RawFd, AsRawFd, ioctl, getsockopt, setsockopt,
3 getsockname, getpeername, socket, bind, shutdown};
4use core::{IoContext, AsIoContext, Socket, AsyncFd};
5use async::{Handler};
6use reactive_io::{AsAsyncFd, cancel, connect, async_connect, getnonblock, setnonblock,
7 send, async_send, recv, async_recv, write, async_write, read, async_read};
8use socket_base::{Shutdown, BytesReadable};
9use streams::Stream;
10
11use std::io;
12use std::fmt;
13
14pub struct StreamSocket<P> {
16 pro: P,
17 fd: AsyncFd,
18}
19
20impl<P: Protocol> StreamSocket<P> {
21 pub fn new(ctx: &IoContext, pro: P) -> io::Result<StreamSocket<P>> {
22 let fd = try!(socket(&pro));
23 Ok(unsafe { Self::from_raw_fd(ctx, pro, fd) })
24 }
25
26 pub fn async_connect<F>(&self, ep: &P::Endpoint, handler: F) -> F::Output
27 where F: Handler<(), io::Error>,
28 {
29 async_connect(self, ep, handler)
30 }
31
32 pub fn async_receive<F>(&self, buf: &mut [u8], flags: i32, handler: F) -> F::Output
33 where F: Handler<usize, io::Error>,
34 {
35 async_recv(self, buf, flags, handler)
36 }
37
38 pub fn async_send<F>(&self, buf: &[u8], flags: i32, handler: F) -> F::Output
39 where F: Handler<usize, io::Error>,
40 {
41 async_send(self, buf, flags, handler)
42 }
43
44 pub fn available(&self) -> io::Result<usize> {
45 let mut bytes = BytesReadable::default();
46 try!(self.io_control(&mut bytes));
47 Ok(bytes.get())
48 }
49
50 pub fn bind(&self, ep: &P::Endpoint) -> io::Result<()> {
51 bind(self, ep)
52 }
53
54 pub fn cancel(&self) {
55 cancel(self)
56 }
57
58 pub fn connect(&self, ep: &P:: Endpoint) -> io::Result<()> {
59 connect(self, ep)
60 }
61
62 pub fn get_non_blocking(&self) -> io::Result<bool> {
63 getnonblock(self)
64 }
65
66 pub fn get_option<C>(&self) -> io::Result<C>
67 where C: GetSocketOption<P>,
68 {
69 getsockopt(self, &self.pro)
70 }
71
72 pub fn io_control<C>(&self, cmd: &mut C) -> io::Result<()>
73 where C: IoControl,
74 {
75 ioctl(self, cmd)
76 }
77
78 pub fn local_endpoint(&self) -> io::Result<P::Endpoint> {
79 getsockname(self, &self.pro)
80 }
81
82 pub fn receive(&self, buf: &mut [u8], flags: i32) -> io::Result<usize> {
83 recv(self, buf, flags)
84 }
85
86 pub fn remote_endpoint(&self) -> io::Result<P::Endpoint> {
87 getpeername(self, &self.pro)
88 }
89
90 pub fn send(&self, buf: &[u8], flags: i32) -> io::Result<usize> {
91 send(self, buf, flags)
92 }
93
94 pub fn set_non_blocking(&self, on: bool) -> io::Result<()> {
95 setnonblock(self, on)
96 }
97
98 pub fn set_option<C>(&self, cmd: C) -> io::Result<()>
99 where C: SetSocketOption<P>,
100 {
101 setsockopt(self, &self.pro, cmd)
102 }
103
104 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
105 shutdown(self, how)
106 }
107}
108
109impl<P: Protocol> Stream<io::Error> for StreamSocket<P> {
110 fn async_read_some<F>(&self, buf: &mut [u8], handler: F) -> F::Output
111 where F: Handler<usize, io::Error>,
112 {
113 async_read(self, buf, handler)
114 }
115
116 fn async_write_some<F>(&self, buf: &[u8], handler: F) -> F::Output
117 where F: Handler<usize, io::Error>,
118 {
119 async_write(self, buf, handler)
120 }
121
122 fn read_some(&self, buf: &mut [u8]) -> io::Result<usize> {
123 read(self, buf)
124 }
125
126 fn write_some(&self, buf: &[u8]) -> io::Result<usize> {
127 write(self, buf)
128 }
129}
130
131impl<P: Protocol> fmt::Debug for StreamSocket<P> {
132 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
133 write!(f, "StreamSocket({:?})", self.pro)
134 }
135}
136
137impl<P> AsRawFd for StreamSocket<P> {
138 fn as_raw_fd(&self) -> RawFd {
139 self.fd.as_raw_fd()
140 }
141}
142
143unsafe impl<P> Send for StreamSocket<P> { }
144
145unsafe impl<P> AsIoContext for StreamSocket<P> {
146 fn as_ctx(&self) -> &IoContext {
147 self.fd.as_ctx()
148 }
149}
150
151impl<P: Protocol> Socket<P> for StreamSocket<P> {
152 unsafe fn from_raw_fd(ctx: &IoContext, pro: P, fd: RawFd) -> StreamSocket<P> {
153 StreamSocket {
154 pro: pro,
155 fd: AsyncFd::new::<Self>(fd, ctx),
156 }
157 }
158
159 fn protocol(&self) -> P {
160 self.pro.clone()
161 }
162}
163
164impl<P: Protocol> AsAsyncFd for StreamSocket<P> {
165 fn as_fd(&self) -> &AsyncFd {
166 &self.fd
167 }
168}