1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
#include "../epoll.h"
#include "../error.h"
#include "../log.h"
#include "../qman.h"
#include <assert.h>
#include <demi/wait.h>
#include <errno.h>
#include <pthread.h>
#include <string.h>
#include <sys/epoll.h>
#include <time.h>
#include <glue.h>
int __epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
{
// Check if epoll descriptor is managed by Demikernel.
if (epfd < EPOLL_MAX_FDS)
{
TRACE("not managed by Demikernel epfd=%d", epfd);
errno = EBADF;
return -1;
}
int demikernel_epfd = -1;
if ((demikernel_epfd = queue_man_get_demikernel_epfd(epfd - EPOLL_MAX_FDS)) == -1)
{
TRACE("not managed by Demikernel epfd=%d", epfd);
errno = EBADF;
return -1;
}
TRACE("demikernel_epfd=%d, events=%p, maxevents=%d, timeout=%d", epfd, (void *)events, maxevents, timeout);
// Check for invalid epoll file descriptor.
if ((demikernel_epfd < 0) || (demikernel_epfd >= EPOLL_MAX_FDS))
{
ERROR("invalid epoll file descriptor epfd=%d", demikernel_epfd);
errno = EINVAL;
return -1;
}
// We intentionally set the timeout to zero, because
// millisecond timeouts are too coarse-grain for Demikernel.
timeout = 0;
struct timespec abstime = {timeout / 1000, timeout * 1000 * 1000};
demi_qresult_t qr;
int ready_offset;
demi_qtoken_t qts[MAX_EVENTS];
struct demi_event *evs[MAX_EVENTS];
int nevents = epoll_get_ready(demikernel_epfd, qts, evs);
int nret = 0;
if (nevents > 0)
{
int ret = __demi_wait_any(&qr, &ready_offset, qts, nevents, &abstime);
if (ret != 0)
{
ERROR("demi_timedwait() failed - %s", strerror(ret));
return 0;
}
evs[ready_offset]->qr = qr;
switch (evs[ready_offset]->qr.qr_opcode)
{
case DEMI_OPC_ACCEPT:
{
// Fill in event.
events[nret].events = evs[ready_offset]->ev.events;
events[nret].data.fd = evs[ready_offset]->sockqd;
evs[ready_offset]->qt = -1;
nret++;
// Store I/O queue operation result.
queue_man_set_accept_result(evs[ready_offset]->sockqd, evs[ready_offset]);
}
break;
case DEMI_OPC_CONNECT:
{
// TODO: implement.
UNIMPLEMETED("parse result of demi_connect()");
}
break;
case DEMI_OPC_POP:
{
// Fill in event.
events[nret].events = evs[ready_offset]->ev.events;
events[nret].data.fd = evs[ready_offset]->sockqd;
events[nret].data.ptr = evs[ready_offset]->ev.data.ptr;
events[nret].data.u32 = evs[ready_offset]->ev.data.u32;
evs[ready_offset]->qt = -1;
nret++;
// Store I/O queue operation result.
queue_man_set_pop_result(evs[ready_offset]->sockqd, evs[ready_offset]);
}
break;
case DEMI_OPC_PUSH:
{
// TODO: implement.
UNIMPLEMETED("parse result of demi_push()")
}
break;
case DEMI_OPC_FAILED:
{
// Handle timeout: re-issue operation.
if (evs[ready_offset]->qr.qr_ret == ETIMEDOUT)
{
// Check if read was requested.
if (evs[ready_offset]->ev.events & EPOLLIN)
{
demi_qtoken_t qt = -1;
if (queue_man_is_listen_fd(evs[ready_offset]->sockqd))
{
assert(__demi_accept(&qt, evs[ready_offset]->sockqd) == 0);
}
else
{
assert(__demi_pop(&qt, evs[ready_offset]->sockqd) == 0);
}
evs[ready_offset]->qt = qt;
}
// Check if write was requested.
if (evs[ready_offset]->ev.events & EPOLLOUT)
{
// TODO: implement.
UNIMPLEMETED("add EPOLLOUT event");
}
}
WARN("operation failed - %s", strerror(evs[ready_offset]->qr.qr_ret));
errno = EINTR;
return (-1);
}
break;
default:
{
// TODO: implement.
UNIMPLEMETED("signal that Demikernel operation failed");
}
break;
}
}
return (nret);
}