1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
/**
* @file
* This contains the API for the test socket server used to test the library's
* core `lcbio` functionality.
*/
#ifndef NOMINMAX
#define NOMINMAX
#endif
#include "config.h"
#ifndef _WIN32
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <unistd.h>
#define closesocket close
#else
#include "config.h"
#define sched_yield()
#define SHUT_RDWR SD_BOTH
#define ssize_t SSIZE_T
#endif
#include <cassert>
#include <cerrno>
#include <cstdlib>
#include <cstring>
#include <cstdio>
#include <list>
#include <vector>
#include <string>
#include "threads.h"
namespace LCBTest {
class TestServer;
class TestConnection;
/** Convenience class representing a numeric socket handle */
class SockFD {
public:
SockFD(int sock);
virtual ~SockFD();
virtual void close();
virtual int getFD() const { return fd; }
operator int() const { return getFD(); }
void loadRemoteAddr();
const struct sockaddr_in& localAddr4() {
return *(struct sockaddr_in *)&sa_local;
}
const struct sockaddr_in& remoteAddr4() {
return *(struct sockaddr_in *)&sa_remote;
}
uint16_t getLocalPort() {
return ntohs(localAddr4().sin_port);
}
uint16_t getRemotePort() {
return ntohs(remoteAddr4().sin_port);
}
std::string getLocalHost() {
return getHostCommon(&sa_local);
}
std::string getRemoteHost() {
return getHostCommon(&sa_remote);
}
template <typename T> bool setOption(int level, int option, T val) {
int rv = setsockopt(fd, level, option, (char *)&val, sizeof val);
return rv == 0;
}
bool setNodelay(bool enabled = true) {
int isEnabled = enabled ? 1 : 0;
return setOption<int>(IPPROTO_TCP, TCP_NODELAY, isEnabled);
}
SockFD *acceptClient();
virtual size_t send(const void *buf, size_t n, int flags = 0) {
return ::send(fd, (const char *)buf, n, flags);
}
virtual ssize_t recv(void *buf, size_t n, int flags = 0) {
return ::recv(fd, (char *)buf, n, flags);
}
static SockFD *newListener();
static SockFD *newClient(SockFD *server);
private:
static std::string getHostCommon(sockaddr_storage *ss);
socklen_t naddr;
struct sockaddr_storage sa_local;
struct sockaddr_storage sa_remote;
#ifdef _WIN32
SOCKET fd;
#else
int fd;
#endif
SockFD(SockFD&);
};
/**
* A Future represents a certain action the server should take. Since the server
* is essentially a dumb data handler, it relies on the test logic (in this case,
* the client) to control what it does.
*
* Futures represent a certain action the server should take (see the various
* subclasses). They can be waited on (via wait()), and their status can be
* checked (via isOk()).
*
* Note that futures are executed in the context of the _server_'s thread,
* so that a future may be done before the wait() method is called.
*
* See the specific subclasses of future for more usage details.
*
* @see SendFuture
* @see RecvFuture
*/
class Future {
public:
/**Wait until the task has been completed by the @ref TestConnection */
void wait();
/**Return if the task completed successfully. Only valid once wait() has
* returned.
* @return true on success, false on failure */
bool isOk() { return !failed; }
/**A non-blocking way to check if the task has completed
* @return true if completed, false otherwise */
bool checkDone();
virtual ~Future();
protected:
friend class TestConnection;
/**Locks the state of the Future. The action to be performed should be
* done after this is called. When the action is done, call endUpdate() */
void startUpdate();
/**Closing bracket for startUpdate() */
void endUpdate();
void updateFailed() {
startUpdate();
bail();
endUpdate();
}
/** Indicate this action has failed. Should only be called in an active
* startUpdate()/endUpdate() block */
void bail() {
failed = true;
last_errno = errno;
printf("Bailing: Error=%d\n", last_errno);
}
/** Implemented by subclasses to determine if the action is done
* @return true if done, false otherwise */
virtual bool isDone() = 0;
Future();
private:
Mutex mutex;
Condvar cond;
volatile bool failed;
bool shouldEnd() { return isDone() || failed; }
volatile int last_errno;
};
/** Future implementation that makes the server _send_ a buffer to the client */
class SendFuture : public Future {
public:
/**@param bytes The buffer to send
* @param nbytes The number of bytes to send */
SendFuture(const void *bytes, size_t nbytes) : Future() {
buf.insert(buf.begin(), (char *)bytes, (char *)bytes + nbytes);
nsent = 0;
}
SendFuture(const std::string& ss) {
buf.assign(ss.begin(), ss.end());
nsent = 0;
}
protected:
bool isDone() {
return nsent == buf.size();
}
private:
friend class TestConnection;
/**Returns the beginning of the unsent buffer
* @param[out] outbuf the pointer to contain the buffer */
size_t getBuf(void **outbuf) {
size_t ret = buf.size() - nsent;
*outbuf = &buf[nsent];
return ret;
}
/**Called to update the sent count.
* @param n The number of bytes just sent. */
void setSent(size_t n) {
nsent += n;
}
volatile unsigned nsent;
std::vector<char> buf;
};
/**
* @ref Future implementation which instructs the server to receive a number
* of bytes _sent_ by the client
*/
class RecvFuture : public Future {
public:
/** @param n The number of bytes (exactly) to receive. */
RecvFuture(size_t n) : Future() {
reinit(n);
}
/**Discards the internal state and modifies the number of bytes to wait for
* @param n The new number of bytes to wait for.
* This is used by some tests to save on reinitialization */
void reinit(size_t n) {
required = n;
buf.clear();
buf.reserve(n);
}
/** Get the contents the server received as a `vector`
* @return The received data */
std::vector<char> getBuf() {
return buf;
}
std::string getString() {
return std::string(buf.begin(), buf.end());
}
protected:
bool isDone() { return buf.size() == required; }
private:
friend class TestConnection;
/**@return The number of bytes remaining to be received.
* Used by @ref TestConnection */
size_t getRequired() { return required - buf.size(); }
/**Call when new bytes are received on the connection.
* @param rbuf The buffer containing the new contents
* @param nbuf The length of the buffer
* Used by @ref TestConnection */
void setReceived(void *rbuf, size_t nbuf) {
char *cbuf = (char *)rbuf;
buf.insert(buf.end(), cbuf, cbuf + nbuf);
}
volatile size_t required;
std::vector<char> buf;
};
/**
* @ref Future implementation which makes the server _close_ the connection.
* The wait() will wait until the connection has been closed. This is useful
* if you wish to test behavior on a closed socket (i.e. a socket on which the
* remote closed the connection).
*/
class CloseFuture : public Future {
public:
/**
* @enum CloseTime
*
* Indicates _when_ the close should take place. @ref CloseFuture objects
* can be performed either before any I/O is pending (i.e. ignore any
* outstanding I/O requests), or _after_ all pending I/O (i.e. any
* @ref SendFuture or @ref RecvFuture objects) has completed.
*/
enum CloseTime {
BEFORE_IO /**< Close socket before any I/O is performed */,
AFTER_IO /**< Closed socket once all pending I/O operations have successfully completed */
};
/**@param type See documentation for @ref CloseTime */
CloseFuture(CloseTime type) : Future() {
performed = false;
closeTime = type;
}
protected:
bool isDone() { return performed; }
private:
friend class TestConnection;
void setDone() { performed = true; }
CloseTime getType() { return closeTime; }
volatile bool performed;
CloseTime closeTime;
};
/**
* Representation of a server side remote endpoint
*
* A TestConnection object is created whenever the @ref TestServer accepts a
* new connection. It can be used by tests to coordinate various actions
* between client and server, using the various @ref Future implementations.
*
* @note Futures of different kinds can exist concurrently within the same
* TestConnection object; however, only _one_ future of a given type can
* be active; so for example, a @ref SendFuture and a @ref RecvFuture may
* be active concurrently, but two @ref CloseFuture objects may not.
*
* Additionally, note that any @ref Future object passed must remain valid
* until it has completed (i.e. Future::wait() or Future::isDone() returns
* true).
*/
class TestConnection {
public:
/**
* Set the @ref SendFuture object to indicate that the server should
* send data
* @param f
*/
void setSend(SendFuture *f) {
setCommon(f, (void **)&f_send);
}
/**
* Indicate that the server should read data. The future object indicates
* how much data to read
* @param f
*/
void setRecv(RecvFuture *f) {
setCommon(f, (void **)&f_recv);
}
/**
* Indicate that the connection should be closed, optionally before or
* after outstanding IO (See @ref CloseFuture for more details).
* @param f
*/
void setClose(CloseFuture *f) {
setCommon(f, (void **)&f_close);
}
/**
* _Immediately_ close the underlying socket connection on the server side.
* This is not the same as @ref CloseFuture which merely _schedules_
* a close
*/
void close() {
datasock->close();
ctlfd_loop->close();
ctlfd_user->close();
ctlfd_lsn->close();
}
/**
* Return the remote port from which the client initiated the connection.
* This is used to determine which the object should
* be associated with a given client (i.e. @ref ESocket) object.
* @return The port of the client.
*/
uint16_t getPeerPort() {
return datasock->getRemotePort();
}
inline void _doRun();
protected:
TestConnection(TestServer *server, SockFD *newsock);
~TestConnection();
virtual void run();
friend class TestServer;
private:
SockFD *datasock;
SockFD *ctlfd_loop;
SockFD *ctlfd_lsn;
SockFD *ctlfd_user;
Mutex mutex;
Condvar initcond;
Thread *thr;
TestServer *parent;
SendFuture *f_send;
RecvFuture *f_recv;
CloseFuture *f_close;
void setCommon(void *src, void **target);
void sendData();
void recvData();
void handleClose();
};
/**
* Represents a listening socket for a test "Server". This server accepts
* connections from clients, and for each new connection, creates a new
* @ref TestConneciton object.
*/
class TestServer {
public:
TestServer();
~TestServer();
/** Run the server. This will open a new thread */
void run();
/** Stop the server. This will close the listening sokcet */
void close() {
closed = true;
lsn->close();
}
bool isClosed() {
return closed;
}
/**Find a connection with a given client port
* @param cliport The client port to search for
* @return The connection object, or `NULL` if no such connection exists.
*/
TestConnection* findConnection(uint16_t cliport);
/**
* Get the listening port
* @return The listening ports that clients may use to connect to this
* object.
*/
uint16_t getListenPort() {
return lsn->getLocalPort();
}
/**
* Get the IP address (usually `127.0.0.1` as a string)
* @return The host the server is listening on
*/
std::string getHostString() {
return lsn->getLocalHost();
}
/**
* Get the listening port, as a string
* @return The listening port
*/
std::string getPortString();
typedef SockFD * (SocketFactory)(int);
SocketFactory *factory;
static SockFD *plainSocketFactory(int fd) { return new SockFD(fd); }
static SockFD *sslSocketFactory(int fd);
private:
friend class TestConnection;
bool closed;
SockFD *lsn;
Thread *thr;
Mutex mutex;
std::list<TestConnection *> conns;
void startConnection(TestConnection *conn);
};
}