; Apache Kafka Streaming Example
; Demonstrates Kafka producer/consumer operations
PRO kafka_streaming_example
; Create a database object for Kafka
objdb = OBJ_NEW('XDLdbDatabase')
; Connect to Kafka broker
; Format: kafka://broker1:9092,broker2:9092
; For local Kafka: kafka://localhost:9092
conn_str = 'kafka://localhost:9092'
PRINT, 'Connecting to Kafka...'
objdb->Connect, CONNECTION=conn_str
IF objdb->IsConnected() THEN BEGIN
PRINT, 'Successfully connected to Kafka!'
ENDIF ELSE BEGIN
PRINT, 'Connection failed!'
RETURN
ENDELSE
; ==================================================================
; TOPIC MANAGEMENT
; ==================================================================
PRINT, ''
PRINT, '=== Topic Management ==='
; Create a new topic
PRINT, 'Creating topic "xdl-events"...'
create_result = objdb->ExecuteSQL('CREATE TOPIC xdl-events')
PRINT, create_result->GetData()
; List all topics
PRINT, ''
PRINT, 'Listing all topics:'
topics_result = objdb->ExecuteSQL('LIST TOPICS')
topics_data = topics_result->GetData()
PRINT, topics_data
topics_result->Destroy()
; ==================================================================
; PRODUCER - Send Messages
; ==================================================================
PRINT, ''
PRINT, '=== Producing Messages ==='
; Send individual messages
PRINT, 'Sending message 1...'
result1 = objdb->ExecuteSQL('PRODUCE TO xdl-events: Hello from XDL!')
PRINT, result1->GetData()
result1->Destroy()
PRINT, 'Sending message 2...'
result2 = objdb->ExecuteSQL('PRODUCE TO xdl-events: Sensor reading: temperature=25.5C')
PRINT, result2->GetData()
result2->Destroy()
PRINT, 'Sending message 3...'
result3 = objdb->ExecuteSQL('PRODUCE TO xdl-events: Alert: System status OK')
PRINT, result3->GetData()
result3->Destroy()
; Send structured data (JSON format)
PRINT, 'Sending JSON message...'
json_msg = '{"timestamp": 1234567890, "sensor_id": "TEMP001", "value": 25.5, "unit": "celsius"}'
result4 = objdb->ExecuteSQL('PRODUCE TO xdl-events: ' + json_msg)
PRINT, result4->GetData()
result4->Destroy()
; Send array data
FOR i = 1, 5 DO BEGIN
msg = 'Data point ' + STRTRIM(i,2) + ': value=' + STRTRIM(i*10.0,2)
result = objdb->ExecuteSQL('PRODUCE TO xdl-events: ' + msg)
result->Destroy()
ENDFOR
PRINT, 'Sent 5 data points'
; Wait a moment for messages to be available
WAIT, 1
; ==================================================================
; CONSUMER - Read Messages
; ==================================================================
PRINT, ''
PRINT, '=== Consuming Messages ==='
; Consume messages from topic (default limit 10)
PRINT, 'Reading first 5 messages from xdl-events:'
consume_result = objdb->ExecuteSQL('CONSUME FROM xdl-events LIMIT 5')
n_messages = consume_result->RowCount()
PRINT, 'Received ' + STRTRIM(n_messages,2) + ' messages'
PRINT, ''
; Display messages
IF n_messages GT 0 THEN BEGIN
col_names = consume_result->ColumnNames()
PRINT, 'Columns:', col_names
messages = consume_result->GetData()
PRINT, ''
PRINT, 'Messages:'
PRINT, messages
; Get specific column (payload)
payloads = consume_result->GetColumn('payload')
PRINT, ''
PRINT, 'Message Payloads:'
FOR i = 0, N_ELEMENTS(payloads)-1 DO BEGIN
PRINT, ' ' + STRTRIM(i+1,2) + ': ' + payloads[i]
ENDFOR
ENDIF
consume_result->Destroy()
; ==================================================================
; STREAM PROCESSING EXAMPLE
; ==================================================================
PRINT, ''
PRINT, '=== Stream Processing Example ==='
; Create a sensor data topic
objdb->ExecuteSQL, 'CREATE TOPIC sensor-data'
; Simulate sensor data streaming
PRINT, 'Simulating sensor data stream...'
FOR i = 1, 10 DO BEGIN
temperature = 20.0 + RANDOMU(seed) * 10.0
humidity = 40.0 + RANDOMU(seed) * 40.0
timestamp = SYSTIME(/SECONDS)
; Format as JSON
sensor_msg = '{"sensor":"DHT22","temp":' + STRTRIM(temperature,2) + $
',"humid":' + STRTRIM(humidity,2) + $
',"time":' + STRTRIM(timestamp,2) + '}'
result = objdb->ExecuteSQL('PRODUCE TO sensor-data: ' + sensor_msg)
result->Destroy()
WAIT, 0.1 ; Small delay between messages
ENDFOR
PRINT, 'Sent 10 sensor readings'
; Read and process stream data
WAIT, 1
PRINT, ''
PRINT, 'Processing sensor data stream...'
sensor_result = objdb->ExecuteSQL('CONSUME FROM sensor-data LIMIT 10')
IF sensor_result->RowCount() GT 0 THEN BEGIN
payloads = sensor_result->GetColumn('payload')
; Parse and analyze (simplified - in real use, parse JSON)
PRINT, 'Received ' + STRTRIM(N_ELEMENTS(payloads),2) + ' sensor readings'
PRINT, 'Sample readings:'
FOR i = 0, MIN([2, N_ELEMENTS(payloads)-1]) DO BEGIN
PRINT, ' ' + payloads[i]
ENDFOR
ENDIF
sensor_result->Destroy()
; ==================================================================
; CLEANUP
; ==================================================================
PRINT, ''
PRINT, '=== Cleanup ==='
; Delete test topics (optional)
; Note: Topic deletion may require broker configuration
; delete_result = objdb->ExecuteSQL('DELETE TOPIC xdl-events')
; PRINT, delete_result->GetData()
; delete_result->Destroy()
; Disconnect
objdb->Disconnect()
OBJ_DESTROY, objdb
PRINT, ''
PRINT, 'Kafka streaming example completed!'
ENDPRO
; Run the example
kafka_streaming_example