#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <sys/select.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <string.h>
#include "fluxio.h"
struct conn_data
{
int fd;
fluxio_waker *read_waker;
fluxio_waker *write_waker;
};
static size_t read_cb(void *userdata, fluxio_context *ctx, uint8_t *buf, size_t buf_len)
{
struct conn_data *conn = (struct conn_data *)userdata;
ssize_t ret = read(conn->fd, buf, buf_len);
if (ret >= 0)
{
return ret;
}
if (errno != EAGAIN)
{
return FLUXIO_IO_ERROR;
}
if (conn->read_waker != NULL)
{
fluxio_waker_free(conn->read_waker);
}
conn->read_waker = fluxio_context_waker(ctx);
return FLUXIO_IO_PENDING;
}
static size_t write_cb(void *userdata, fluxio_context *ctx, const uint8_t *buf, size_t buf_len)
{
struct conn_data *conn = (struct conn_data *)userdata;
ssize_t ret = write(conn->fd, buf, buf_len);
if (ret >= 0)
{
return ret;
}
if (errno != EAGAIN)
{
return FLUXIO_IO_ERROR;
}
if (conn->write_waker != NULL)
{
fluxio_waker_free(conn->write_waker);
}
conn->write_waker = fluxio_context_waker(ctx);
return FLUXIO_IO_PENDING;
}
static void free_conn_data(struct conn_data *conn)
{
if (conn->read_waker)
{
fluxio_waker_free(conn->read_waker);
conn->read_waker = NULL;
}
if (conn->write_waker)
{
fluxio_waker_free(conn->write_waker);
conn->write_waker = NULL;
}
free(conn);
}
static int connect_to(const char *host, const char *port)
{
struct addrinfo hints;
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
struct addrinfo *result, *rp;
if (getaddrinfo(host, port, &hints, &result) != 0)
{
printf("dns failed for %s\n", host);
return -1;
}
int sfd;
for (rp = result; rp != NULL; rp = rp->ai_next)
{
sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (sfd == -1)
{
continue;
}
if (connect(sfd, rp->ai_addr, rp->ai_addrlen) != -1)
{
break;
}
close(sfd);
}
freeaddrinfo(result);
if (rp == NULL)
{
printf("connect failed for %s\n", host);
return -1;
}
return sfd;
}
static int print_each_header(void *userdata,
const uint8_t *name,
size_t name_len,
const uint8_t *value,
size_t value_len)
{
printf("%.*s: %.*s\n", (int)name_len, name, (int)value_len, value);
return FLUXIO_ITER_CONTINUE;
}
static int print_each_chunk(void *userdata, const fluxio_buf *chunk)
{
const uint8_t *buf = fluxio_buf_bytes(chunk);
size_t len = fluxio_buf_len(chunk);
write(1, buf, len);
return FLUXIO_ITER_CONTINUE;
}
typedef enum
{
EXAMPLE_NOT_SET = 0, EXAMPLE_HANDSHAKE,
EXAMPLE_SEND,
EXAMPLE_RESP_BODY
} example_id;
#define STR_ARG(XX) (uint8_t *)XX, strlen(XX)
int main(int argc, char *argv[])
{
const char *host = argc > 1 ? argv[1] : "httpbin.org";
const char *port = argc > 2 ? argv[2] : "80";
const char *path = argc > 3 ? argv[3] : "/";
printf("connecting to port %s on %s...\n", port, host);
int fd = connect_to(host, port);
if (fd < 0)
{
return 1;
}
printf("connected to %s, now get %s\n", host, path);
if (fcntl(fd, F_SETFL, O_NONBLOCK) != 0)
{
printf("failed to set socket to non-blocking\n");
return 1;
}
fd_set fds_read;
fd_set fds_write;
fd_set fds_excep;
struct conn_data *conn = malloc(sizeof(struct conn_data));
conn->fd = fd;
conn->read_waker = NULL;
conn->write_waker = NULL;
fluxio_io *io = fluxio_io_new();
fluxio_io_set_userdata(io, (void *)conn);
fluxio_io_set_read(io, read_cb);
fluxio_io_set_write(io, write_cb);
printf("http handshake (fluxio v%s) ...\n", fluxio_version());
const fluxio_executor *exec = fluxio_executor_new();
fluxio_clientconn_options *opts = fluxio_clientconn_options_new();
fluxio_clientconn_options_exec(opts, exec);
fluxio_task *handshake = fluxio_clientconn_handshake(io, opts);
fluxio_task_set_userdata(handshake, (void *)EXAMPLE_HANDSHAKE);
fluxio_executor_push(exec, handshake);
fluxio_error *err;
while (1)
{
while (1)
{
fluxio_task *task = fluxio_executor_poll(exec);
if (!task)
{
break;
}
switch ((example_id)fluxio_task_userdata(task))
{
case EXAMPLE_HANDSHAKE:;
if (fluxio_task_type(task) == FLUXIO_TASK_ERROR)
{
printf("handshake error!\n");
err = fluxio_task_value(task);
goto fail;
}
assert(fluxio_task_type(task) == FLUXIO_TASK_CLIENTCONN);
printf("preparing http request ...\n");
fluxio_clientconn *client = fluxio_task_value(task);
fluxio_task_free(task);
fluxio_request *req = fluxio_request_new();
if (fluxio_request_set_method(req, STR_ARG("GET")))
{
printf("error setting method\n");
return 1;
}
if (fluxio_request_set_uri(req, STR_ARG(path)))
{
printf("error setting uri\n");
return 1;
}
fluxio_headers *req_headers = fluxio_request_headers(req);
fluxio_headers_set(req_headers, STR_ARG("Host"), STR_ARG(host));
fluxio_task *send = fluxio_clientconn_send(client, req);
fluxio_task_set_userdata(send, (void *)EXAMPLE_SEND);
printf("sending ...\n");
fluxio_executor_push(exec, send);
fluxio_clientconn_free(client);
break;
case EXAMPLE_SEND:;
if (fluxio_task_type(task) == FLUXIO_TASK_ERROR)
{
printf("send error!\n");
err = fluxio_task_value(task);
goto fail;
}
assert(fluxio_task_type(task) == FLUXIO_TASK_RESPONSE);
fluxio_response *resp = fluxio_task_value(task);
fluxio_task_free(task);
uint16_t http_status = fluxio_response_status(resp);
const uint8_t *rp = fluxio_response_reason_phrase(resp);
size_t rp_len = fluxio_response_reason_phrase_len(resp);
printf("\nResponse Status: %d %.*s\n", http_status, (int)rp_len, rp);
fluxio_headers *headers = fluxio_response_headers(resp);
fluxio_headers_foreach(headers, print_each_header, NULL);
printf("\n");
fluxio_body *resp_body = fluxio_response_body(resp);
fluxio_task *foreach = fluxio_body_foreach(resp_body, print_each_chunk, NULL);
fluxio_task_set_userdata(foreach, (void *)EXAMPLE_RESP_BODY);
fluxio_executor_push(exec, foreach);
fluxio_response_free(resp);
break;
case EXAMPLE_RESP_BODY:;
if (fluxio_task_type(task) == FLUXIO_TASK_ERROR)
{
printf("body error!\n");
err = fluxio_task_value(task);
goto fail;
}
assert(fluxio_task_type(task) == FLUXIO_TASK_EMPTY);
printf("\n -- Done! -- \n");
fluxio_task_free(task);
fluxio_executor_free(exec);
free_conn_data(conn);
return 0;
case EXAMPLE_NOT_SET:
fluxio_task_free(task);
break;
}
}
FD_ZERO(&fds_read);
FD_ZERO(&fds_write);
FD_ZERO(&fds_excep);
if (conn->read_waker)
{
FD_SET(conn->fd, &fds_read);
}
if (conn->write_waker)
{
FD_SET(conn->fd, &fds_write);
}
int sel_ret = select(conn->fd + 1, &fds_read, &fds_write, &fds_excep, NULL);
if (sel_ret < 0)
{
printf("select() error\n");
return 1;
}
if (FD_ISSET(conn->fd, &fds_read))
{
fluxio_waker_wake(conn->read_waker);
conn->read_waker = NULL;
}
if (FD_ISSET(conn->fd, &fds_write))
{
fluxio_waker_wake(conn->write_waker);
conn->write_waker = NULL;
}
}
return 0;
fail:
if (err)
{
printf("error code: %d\n", fluxio_error_code(err));
char errbuf[256];
size_t errlen = fluxio_error_print(err, errbuf, sizeof(errbuf));
printf("details: %.*s\n", (int)errlen, errbuf);
fluxio_error_free(err);
}
return 1;
}