#include "dpiImpl.h"
static int dpiQueue__allocateBuffer(dpiQueue *queue, uint32_t numElements,
dpiError *error);
static int dpiQueue__deq(dpiQueue *queue, uint32_t *numProps,
dpiMsgProps **props, dpiError *error);
static void dpiQueue__freeBuffer(dpiQueue *queue, dpiError *error);
static int dpiQueue__getPayloadTDO(dpiQueue *queue, void **tdo,
dpiError *error);
int dpiQueue__allocate(dpiConn *conn, const char *name, uint32_t nameLength,
dpiObjectType *payloadType, dpiQueue **queue, int isJson,
dpiError *error)
{
dpiQueue *tempQueue;
char *buffer;
if (dpiGen__allocate(DPI_HTYPE_QUEUE, conn->env, (void**) &tempQueue,
error) < 0)
return DPI_FAILURE;
dpiGen__setRefCount(conn, error, 1);
tempQueue->conn = conn;
tempQueue->isJson = isJson;
if (payloadType) {
dpiGen__setRefCount(payloadType, error, 1);
tempQueue->payloadType = payloadType;
}
if (dpiUtils__allocateMemory(1, nameLength + 1, 0, "queue name",
(void**) &buffer, error) < 0) {
dpiQueue__free(tempQueue, error);
return DPI_FAILURE;
}
memcpy(buffer, name, nameLength);
buffer[nameLength] = '\0';
tempQueue->name = buffer;
*queue = tempQueue;
return DPI_SUCCESS;
}
static int dpiQueue__allocateBuffer(dpiQueue *queue, uint32_t numElements,
dpiError *error)
{
dpiQueue__freeBuffer(queue, error);
queue->buffer.numElements = numElements;
if (dpiUtils__allocateMemory(numElements, sizeof(dpiMsgProps*), 1,
"allocate msg props array", (void**) &queue->buffer.props,
error) < 0)
return DPI_FAILURE;
if (dpiUtils__allocateMemory(numElements, sizeof(void*), 1,
"allocate OCI handles array", (void**) &queue->buffer.handles,
error) < 0)
return DPI_FAILURE;
if (dpiUtils__allocateMemory(numElements, sizeof(void*), 1,
"allocate OCI instances array", (void**) &queue->buffer.instances,
error) < 0)
return DPI_FAILURE;
if (dpiUtils__allocateMemory(numElements, sizeof(void*), 1,
"allocate OCI indicators array",
(void**) &queue->buffer.indicators, error) < 0)
return DPI_FAILURE;
if (!queue->payloadType) {
if (dpiUtils__allocateMemory(numElements, sizeof(int16_t), 1,
"allocate array of OCI scalar indicator buffers",
(void**) &queue->buffer.scalarIndicators, error) < 0)
return DPI_FAILURE;
}
if (dpiUtils__allocateMemory(numElements, sizeof(void*), 1,
"allocate message ids array", (void**) &queue->buffer.msgIds,
error) < 0)
return DPI_FAILURE;
return DPI_SUCCESS;
}
static int dpiQueue__check(dpiQueue *queue, const char *fnName,
dpiError *error)
{
if (dpiGen__startPublicFn(queue, DPI_HTYPE_QUEUE, fnName, error) < 0)
return DPI_FAILURE;
if (!queue->conn->handle || queue->conn->closing)
return dpiError__set(error, "check connection", DPI_ERR_NOT_CONNECTED);
return DPI_SUCCESS;
}
static int dpiQueue__createDeqOptions(dpiQueue *queue, dpiError *error)
{
dpiDeqOptions *tempOptions;
if (dpiGen__allocate(DPI_HTYPE_DEQ_OPTIONS, queue->env,
(void**) &tempOptions, error) < 0)
return DPI_FAILURE;
if (dpiDeqOptions__create(tempOptions, queue->conn, error) < 0) {
dpiDeqOptions__free(tempOptions, error);
return DPI_FAILURE;
}
queue->deqOptions = tempOptions;
return DPI_SUCCESS;
}
static int dpiQueue__createEnqOptions(dpiQueue *queue, dpiError *error)
{
dpiEnqOptions *tempOptions;
if (dpiGen__allocate(DPI_HTYPE_ENQ_OPTIONS, queue->env,
(void**) &tempOptions, error) < 0)
return DPI_FAILURE;
if (dpiEnqOptions__create(tempOptions, queue->conn, error) < 0) {
dpiEnqOptions__free(tempOptions, error);
return DPI_FAILURE;
}
queue->enqOptions = tempOptions;
return DPI_SUCCESS;
}
static int dpiQueue__deq(dpiQueue *queue, uint32_t *numProps,
dpiMsgProps **props, dpiError *error)
{
dpiMsgProps *prop;
void *payloadTDO;
uint32_t i;
int status;
if (!queue->deqOptions && dpiQueue__createDeqOptions(queue, error) < 0)
return DPI_FAILURE;
if (queue->buffer.numElements < *numProps &&
dpiQueue__allocateBuffer(queue, *numProps, error) < 0)
return DPI_FAILURE;
for (i = 0; i < *numProps; i++) {
prop = queue->buffer.props[i];
if (!prop) {
if (dpiMsgProps__allocate(queue->conn, &prop, error) < 0)
return DPI_FAILURE;
queue->buffer.props[i] = prop;
}
if (queue->payloadType && !prop->payloadObj &&
dpiObject__allocate(queue->payloadType, NULL, NULL, NULL,
&prop->payloadObj, error) < 0)
return DPI_FAILURE;
if (queue->isJson) {
if (dpiJson__allocate(queue->conn, NULL, &prop->payloadJson,
error) < 0)
return DPI_FAILURE;
}
queue->buffer.handles[i] = prop->handle;
if (queue->payloadType) {
queue->buffer.instances[i] = prop->payloadObj->instance;
queue->buffer.indicators[i] = prop->payloadObj->indicator;
} else if (queue->isJson) {
queue->buffer.instances[i] = prop->payloadJson->handle;
queue->buffer.indicators[i] = &queue->buffer.scalarIndicators[i];
} else {
queue->buffer.instances[i] = prop->payloadRaw;
queue->buffer.indicators[i] = &queue->buffer.scalarIndicators[i];
}
queue->buffer.msgIds[i] = prop->msgIdRaw;
}
if (dpiQueue__getPayloadTDO(queue, &payloadTDO, error) < 0)
return DPI_FAILURE;
if (*numProps == 1) {
status = dpiOci__aqDeq(queue->conn, queue->name,
queue->deqOptions->handle, queue->buffer.handles[0],
payloadTDO, queue->buffer.instances, queue->buffer.indicators,
queue->buffer.msgIds, error);
if (status < 0)
*numProps = 0;
} else if (queue->isJson) {
status = DPI_SUCCESS;
for (i = 0; i < *numProps; i++) {
status = dpiOci__aqDeq(queue->conn, queue->name,
queue->deqOptions->handle, queue->buffer.handles[i],
payloadTDO, &queue->buffer.instances[i],
&queue->buffer.indicators[i],
&queue->buffer.msgIds[i], error);
if (status < 0) {
*numProps = i;
break;
}
}
} else {
status = dpiOci__aqDeqArray(queue->conn, queue->name,
queue->deqOptions->handle, numProps, queue->buffer.handles,
payloadTDO, queue->buffer.instances, queue->buffer.indicators,
queue->buffer.msgIds, error);
}
if (status < 0 && error->buffer->code != 25228) {
error->buffer->offset = *numProps;
return DPI_FAILURE;
}
for (i = 0; i < *numProps; i++) {
props[i] = queue->buffer.props[i];
queue->buffer.props[i] = NULL;
if (queue->isJson) {
props[i]->payloadJson->handle = queue->buffer.instances[i];
} else if (!queue->payloadType) {
props[i]->payloadRaw = queue->buffer.instances[i];
}
props[i]->msgIdRaw = queue->buffer.msgIds[i];
}
return DPI_SUCCESS;
}
static int dpiQueue__enq(dpiQueue *queue, uint32_t numProps,
dpiMsgProps **props, dpiError *error)
{
void *payloadTDO;
uint32_t i;
if (numProps == 0)
return DPI_SUCCESS;
if (!queue->enqOptions && dpiQueue__createEnqOptions(queue, error) < 0)
return DPI_FAILURE;
if (queue->buffer.numElements < numProps &&
dpiQueue__allocateBuffer(queue, numProps, error) < 0)
return DPI_FAILURE;
for (i = 0; i < numProps; i++) {
if (!props[i]->payloadObj && !props[i]->payloadRaw &&
!props[i]->payloadJson)
return dpiError__set(error, "check payload",
DPI_ERR_QUEUE_NO_PAYLOAD);
if ((queue->isJson && !props[i]->payloadJson) ||
(queue->payloadType && !props[i]->payloadObj) ||
(!queue->isJson && !queue->payloadType &&
!props[i]->payloadRaw))
return dpiError__set(error, "check payload",
DPI_ERR_QUEUE_WRONG_PAYLOAD_TYPE);
if (queue->payloadType && props[i]->payloadObj &&
queue->payloadType->tdo != props[i]->payloadObj->type->tdo)
return dpiError__set(error, "check payload",
DPI_ERR_WRONG_TYPE,
props[i]->payloadObj->type->schemaLength,
props[i]->payloadObj->type->schema,
props[i]->payloadObj->type->nameLength,
props[i]->payloadObj->type->name,
queue->payloadType->schemaLength,
queue->payloadType->schema,
queue->payloadType->nameLength,
queue->payloadType->name);
queue->buffer.handles[i] = props[i]->handle;
if (queue->payloadType) {
queue->buffer.instances[i] = props[i]->payloadObj->instance;
queue->buffer.indicators[i] = props[i]->payloadObj->indicator;
} else if (props[i]->payloadJson) {
queue->buffer.instances[i] = props[i]->payloadJson->handle;
queue->buffer.indicators[i] = &queue->buffer.scalarIndicators[i];
} else {
queue->buffer.instances[i] = props[i]->payloadRaw;
queue->buffer.indicators[i] = &queue->buffer.scalarIndicators[i];
}
queue->buffer.msgIds[i] = props[i]->msgIdRaw;
}
if (dpiQueue__getPayloadTDO(queue, &payloadTDO, error) < 0)
return DPI_FAILURE;
if (numProps == 1) {
if (dpiOci__aqEnq(queue->conn, queue->name, queue->enqOptions->handle,
queue->buffer.handles[0], payloadTDO, queue->buffer.instances,
queue->buffer.indicators, queue->buffer.msgIds, error) < 0)
return DPI_FAILURE;
} else {
if (dpiOci__aqEnqArray(queue->conn, queue->name,
queue->enqOptions->handle, &numProps, queue->buffer.handles,
payloadTDO, queue->buffer.instances, queue->buffer.indicators,
queue->buffer.msgIds, error) < 0) {
error->buffer->offset = numProps;
return DPI_FAILURE;
}
}
for (i = 0; i < numProps; i++)
props[i]->msgIdRaw = queue->buffer.msgIds[i];
return DPI_SUCCESS;
}
void dpiQueue__free(dpiQueue *queue, dpiError *error)
{
if (queue->conn) {
dpiGen__setRefCount(queue->conn, error, -1);
queue->conn = NULL;
}
if (queue->payloadType) {
dpiGen__setRefCount(queue->payloadType, error, -1);
queue->payloadType = NULL;
}
if (queue->name) {
dpiUtils__freeMemory((void*) queue->name);
queue->name = NULL;
}
if (queue->deqOptions) {
dpiGen__setRefCount(queue->deqOptions, error, -1);
queue->deqOptions = NULL;
}
if (queue->enqOptions) {
dpiGen__setRefCount(queue->enqOptions, error, -1);
queue->enqOptions = NULL;
}
dpiQueue__freeBuffer(queue, error);
dpiUtils__freeMemory(queue);
}
static void dpiQueue__freeBuffer(dpiQueue *queue, dpiError *error)
{
dpiQueueBuffer *buffer = &queue->buffer;
uint32_t i;
if (buffer->props) {
for (i = 0; i < buffer->numElements; i++) {
if (buffer->props[i]) {
dpiGen__setRefCount(buffer->props[i], error, -1);
buffer->props[i] = NULL;
}
}
dpiUtils__freeMemory(buffer->props);
buffer->props = NULL;
}
if (buffer->handles) {
dpiUtils__freeMemory(buffer->handles);
buffer->handles = NULL;
}
if (buffer->instances) {
dpiUtils__freeMemory(buffer->instances);
buffer->instances = NULL;
}
if (buffer->indicators) {
dpiUtils__freeMemory(buffer->indicators);
buffer->indicators = NULL;
}
if (buffer->scalarIndicators) {
dpiUtils__freeMemory(buffer->scalarIndicators);
buffer->indicators = NULL;
}
if (buffer->msgIds) {
dpiUtils__freeMemory(buffer->msgIds);
buffer->msgIds = NULL;
}
}
static int dpiQueue__getPayloadTDO(dpiQueue *queue, void **tdo,
dpiError *error)
{
if (queue->payloadType) {
*tdo = queue->payloadType->tdo;
} else if (queue->isJson) {
if (dpiConn__getJsonTDO(queue->conn, error) < 0)
return DPI_FAILURE;
*tdo = queue->conn->jsonTDO;
} else {
if (dpiConn__getRawTDO(queue->conn, error) < 0)
return DPI_FAILURE;
*tdo = queue->conn->rawTDO;
}
return DPI_SUCCESS;
}
int dpiQueue_addRef(dpiQueue *queue)
{
return dpiGen__addRef(queue, DPI_HTYPE_QUEUE, __func__);
}
int dpiQueue_deqMany(dpiQueue *queue, uint32_t *numProps, dpiMsgProps **props)
{
dpiError error;
int status;
if (dpiQueue__check(queue, __func__, &error) < 0)
return dpiGen__endPublicFn(queue, DPI_FAILURE, &error);
DPI_CHECK_PTR_NOT_NULL(queue, numProps)
DPI_CHECK_PTR_NOT_NULL(queue, props)
status = dpiQueue__deq(queue, numProps, props, &error);
return dpiGen__endPublicFn(queue, status, &error);
}
int dpiQueue_deqOne(dpiQueue *queue, dpiMsgProps **props)
{
uint32_t numProps = 1;
dpiError error;
if (dpiQueue__check(queue, __func__, &error) < 0)
return dpiGen__endPublicFn(queue, DPI_FAILURE, &error);
DPI_CHECK_PTR_NOT_NULL(queue, props)
if (dpiQueue__deq(queue, &numProps, props, &error) < 0)
return dpiGen__endPublicFn(queue, DPI_FAILURE, &error);
if (numProps == 0)
*props = NULL;
return dpiGen__endPublicFn(queue, DPI_SUCCESS, &error);
}
int dpiQueue_enqMany(dpiQueue *queue, uint32_t numProps, dpiMsgProps **props)
{
dpiError error;
uint32_t i;
int status;
if (dpiQueue__check(queue, __func__, &error) < 0)
return dpiGen__endPublicFn(queue, DPI_FAILURE, &error);
DPI_CHECK_PTR_NOT_NULL(queue, props)
for (i = 0; i < numProps; i++) {
if (dpiGen__checkHandle(props[i], DPI_HTYPE_MSG_PROPS,
"check message properties", &error) < 0)
return dpiGen__endPublicFn(queue, DPI_FAILURE, &error);
}
status = dpiQueue__enq(queue, numProps, props, &error);
return dpiGen__endPublicFn(queue, status, &error);
}
int dpiQueue_enqOne(dpiQueue *queue, dpiMsgProps *props)
{
dpiError error;
int status;
if (dpiQueue__check(queue, __func__, &error) < 0)
return dpiGen__endPublicFn(queue, DPI_FAILURE, &error);
if (dpiGen__checkHandle(props, DPI_HTYPE_MSG_PROPS,
"check message properties", &error) < 0)
return dpiGen__endPublicFn(queue, DPI_FAILURE, &error);
status = dpiQueue__enq(queue, 1, &props, &error);
return dpiGen__endPublicFn(queue, status, &error);
}
int dpiQueue_getDeqOptions(dpiQueue *queue, dpiDeqOptions **options)
{
dpiError error;
if (dpiGen__startPublicFn(queue, DPI_HTYPE_QUEUE, __func__, &error) < 0)
return DPI_FAILURE;
DPI_CHECK_PTR_NOT_NULL(queue, options)
if (!queue->deqOptions && dpiQueue__createDeqOptions(queue, &error) < 0)
return dpiGen__endPublicFn(queue, DPI_FAILURE, &error);
*options = queue->deqOptions;
return dpiGen__endPublicFn(queue, DPI_SUCCESS, &error);
}
int dpiQueue_getEnqOptions(dpiQueue *queue, dpiEnqOptions **options)
{
dpiError error;
if (dpiGen__startPublicFn(queue, DPI_HTYPE_QUEUE, __func__, &error) < 0)
return DPI_FAILURE;
DPI_CHECK_PTR_NOT_NULL(queue, options)
if (!queue->enqOptions && dpiQueue__createEnqOptions(queue, &error) < 0)
return dpiGen__endPublicFn(queue, DPI_FAILURE, &error);
*options = queue->enqOptions;
return dpiGen__endPublicFn(queue, DPI_SUCCESS, &error);
}
int dpiQueue_release(dpiQueue *queue)
{
return dpiGen__release(queue, DPI_HTYPE_QUEUE, __func__);
}